diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_process.c | 1 | ||||
-rw-r--r-- | src/nxt_process.h | 1 | ||||
-rw-r--r-- | src/nxt_runtime.c | 3 | ||||
-rw-r--r-- | src/nxt_unit.c | 296 |
4 files changed, 93 insertions, 208 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c index 0b3aa40f..9bfae395 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -146,7 +146,6 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) } nxt_port_mmaps_destroy(&p->incoming, 0); - nxt_port_mmaps_destroy(&p->outgoing, 0); } nxt_runtime_process_loop; diff --git a/src/nxt_process.h b/src/nxt_process.h index 4076cefc..ecd813e2 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -92,7 +92,6 @@ typedef struct { nxt_int_t use_count; nxt_port_mmaps_t incoming; - nxt_port_mmaps_t outgoing; nxt_thread_mutex_t cp_mutex; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index c25b93cc..5f4b3e58 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1377,7 +1377,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_queue_init(&process->ports); nxt_thread_mutex_create(&process->incoming.mutex); - nxt_thread_mutex_create(&process->outgoing.mutex); nxt_thread_mutex_create(&process->cp_mutex); process->use_count = 1; @@ -1397,10 +1396,8 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_assert(process->registered == 0); nxt_port_mmaps_destroy(&process->incoming, 1); - nxt_port_mmaps_destroy(&process->outgoing, 1); nxt_thread_mutex_destroy(&process->incoming.mutex); - nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); /* processes from nxt_runtime_process_get() have no memory pool */ diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 7fb2826d..154fd480 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -70,8 +70,6 @@ static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( nxt_unit_ctx_t *ctx); static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); -static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, @@ -114,7 +112,6 @@ static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); @@ -137,7 +134,6 @@ static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); -nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, @@ -179,7 +175,6 @@ struct nxt_unit_mmap_buf_s { nxt_port_mmap_header_t *hdr; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_process_t *process; char *free_ptr; char *plain_ptr; }; @@ -197,7 +192,6 @@ struct nxt_unit_recv_msg_s { uint32_t size; int fd; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t *incoming_buf; }; @@ -217,8 +211,6 @@ struct nxt_unit_request_info_impl_s { uint32_t stream; - nxt_unit_process_t *process; - nxt_unit_mmap_buf_t *outgoing_buf; nxt_unit_mmap_buf_t *incoming_buf; @@ -296,6 +288,23 @@ struct nxt_unit_ctx_impl_s { }; +struct nxt_unit_mmap_s { + nxt_port_mmap_header_t *hdr; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t awaiting_rbuf; +}; + + +struct nxt_unit_mmaps_s { + pthread_mutex_t mutex; + uint32_t size; + uint32_t cap; + nxt_atomic_t allocated_chunks; + nxt_unit_mmap_t *elts; +}; + + struct nxt_unit_impl_s { nxt_unit_t unit; nxt_unit_callbacks_t callbacks; @@ -315,6 +324,9 @@ struct nxt_unit_impl_s { nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ + nxt_unit_mmaps_t incoming; + nxt_unit_mmaps_t outgoing; + pid_t pid; int log_fd; int online; @@ -339,31 +351,11 @@ struct nxt_unit_port_impl_s { }; -struct nxt_unit_mmap_s { - nxt_port_mmap_header_t *hdr; - - /* of nxt_unit_read_buf_t */ - nxt_queue_t awaiting_rbuf; -}; - - -struct nxt_unit_mmaps_s { - pthread_mutex_t mutex; - uint32_t size; - uint32_t cap; - nxt_atomic_t allocated_chunks; - nxt_unit_mmap_t *elts; -}; - - struct nxt_unit_process_s { pid_t pid; nxt_queue_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_mmaps_t incoming; - nxt_unit_mmaps_t outgoing; - nxt_unit_impl_t *lib; nxt_atomic_t use_count; @@ -515,6 +507,9 @@ nxt_unit_create(nxt_unit_init_t *init) goto fail; } + nxt_unit_mmaps_init(&lib->incoming); + nxt_unit_mmaps_init(&lib->outgoing); + return lib; fail: @@ -640,6 +635,9 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) nxt_unit_port_release(lib->shared_port); } + nxt_unit_mmaps_destroy(&lib->incoming); + nxt_unit_mmaps_destroy(&lib->outgoing); + free(lib); } } @@ -807,7 +805,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = NXT_UNIT_ERROR; recv_msg.fd = -1; - recv_msg.process = NULL; port_msg = (nxt_port_msg_t *) rbuf->buf; cm = (struct cmsghdr *) rbuf->oob; @@ -967,10 +964,6 @@ fail: nxt_unit_mmap_buf_free(recv_msg.incoming_buf); } - if (recv_msg.process != NULL) { - nxt_unit_process_release(recv_msg.process); - } - if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { #if (NXT_DEBUG) memset(rbuf->buf, 0xAC, rbuf->size); @@ -1109,14 +1102,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req->content_buf = req->request_buf; req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); - /* "Move" process reference to req_impl. */ - req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(req_impl->process == NULL)) { - return NXT_UNIT_ERROR; - } - - recv_msg->process = NULL; - req_impl->stream = recv_msg->stream; req_impl->outgoing_buf = NULL; @@ -1174,6 +1159,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t *port; + nxt_unit_process_t *process; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_port_impl_t *port_impl; nxt_unit_request_info_impl_t *req_impl; @@ -1244,15 +1230,28 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, return NXT_UNIT_ERROR; } - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + process = nxt_unit_process_find(lib, port_id->pid, 0); + if (nxt_slow_path(process == NULL)) { + nxt_unit_alert(ctx, "check_response_port: process %d not found", + port->id.pid); + + nxt_unit_port_hash_find(&lib->ports, port_id, 1); - nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link); + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } - port_impl->process = req_impl->process; + nxt_queue_insert_tail(&process->ports, &port_impl->link); + port_impl->process = process; nxt_queue_init(&port_impl->awaiting_req); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); port_impl->use_count = 2; @@ -1262,8 +1261,6 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, pthread_mutex_unlock(&lib->mutex); - nxt_unit_process_use(port_impl->process); - res = nxt_unit_get_port(ctx, port_id); if (nxt_slow_path(res == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; @@ -1511,16 +1508,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->content_fd = -1; } - /* - * Process release should go after buffers release to guarantee mmap - * existence. - */ - if (req_impl->process != NULL) { - nxt_unit_process_release(req_impl->process); - - req_impl->process = NULL; - } - if (req->response_port != NULL) { nxt_unit_port_release(req->response_port); @@ -2111,32 +2098,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) } -static nxt_unit_process_t * -nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) -{ - nxt_unit_impl_t *lib; - - if (recv_msg->process != NULL) { - return recv_msg->process; - } - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (recv_msg->process == NULL) { - nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", - recv_msg->stream, (int) recv_msg->pid); - } - - return recv_msg->process; -} - - static nxt_unit_mmap_buf_t * nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) { @@ -2398,12 +2359,11 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, mmap_buf->hdr = NULL; } - nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, (int) m.mmap_msg.chunk_id - (int) first_free_chunk); - nxt_unit_debug(req->ctx, "process %d allocated_chunks %d", - mmap_buf->process->pid, - (int) mmap_buf->process->outgoing.allocated_chunks); + nxt_unit_debug(req->ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); } else { if (nxt_slow_path(mmap_buf->plain_ptr == NULL @@ -2463,7 +2423,6 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) { if (mmap_buf->hdr != NULL) { nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, - mmap_buf->process, mmap_buf->hdr, mmap_buf->buf.start, mmap_buf->buf.end - mmap_buf->buf.start); @@ -2881,7 +2840,6 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) mmap_buf->buf.start = mmap_buf->free_ptr; mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->process = NULL; res = read(req->content_fd, mmap_buf->free_ptr, size); if (res < 0) { @@ -3184,28 +3142,19 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, uint32_t outgoing_size; nxt_unit_mmap_t *mm, *mm_end; nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - process = nxt_unit_port_process(port); - if (nxt_slow_path(process == NULL)) { - nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed", - (int) port->id.pid, (int) port->id.id); - - return NULL; - } - - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); retry: - outgoing_size = process->outgoing.size; + outgoing_size = lib->outgoing.size; - mm_end = process->outgoing.elts + outgoing_size; + mm_end = lib->outgoing.elts + outgoing_size; - for (mm = process->outgoing.elts; mm < mm_end; mm++) { + for (mm = lib->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { @@ -3252,13 +3201,13 @@ retry: if (outgoing_size >= lib->shm_mmap_limit) { /* Cannot allocate more shared memory. */ - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); if (min_n == 0) { *n = 0; } - if (nxt_slow_path(process->outgoing.allocated_chunks + min_n + if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) { /* Memory allocated by application, but not send to router. */ @@ -3287,7 +3236,7 @@ retry: nxt_unit_debug(ctx, "oosm: retry"); - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); goto retry; } @@ -3297,13 +3246,12 @@ retry: unlock: - nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", - process->pid, - (int) process->outgoing.allocated_chunks); + nxt_unit_debug(ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); return hdr; } @@ -3448,20 +3396,11 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; - process = nxt_unit_port_process(port); - if (nxt_slow_path(process == NULL)) { - nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed", - (int) port->id.pid, (int) port->id.id); - - return NULL; - } - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); + mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); if (nxt_slow_path(mm == NULL)) { nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); @@ -3538,9 +3477,9 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); - hdr->id = process->outgoing.size - 1; + hdr->id = lib->outgoing.size - 1; hdr->src_pid = lib->pid; - hdr->dst_pid = process->pid; + hdr->dst_pid = port->id.pid; hdr->sent_over = port->id.id; /* Mark first n chunk(s) as busy */ @@ -3552,7 +3491,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); rc = nxt_unit_send_mmap(ctx, port, fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -3561,12 +3500,12 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) } else { nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", - hdr->id, (int) lib->pid, (int) process->pid); + hdr->id, (int) lib->pid, (int) port->id.pid); } close(fd); - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); if (nxt_fast_path(hdr != NULL)) { return hdr; @@ -3574,7 +3513,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) remove_fail: - process->outgoing.size--; + lib->outgoing.size--; return NULL; } @@ -3662,7 +3601,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->process = nxt_unit_port_process(port); nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", mmap_buf->buf.start, (int) size); @@ -3692,7 +3630,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; - mmap_buf->process = nxt_unit_port_process(port); mmap_buf->free_ptr = NULL; mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); @@ -3713,7 +3650,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) struct stat mmap_stat; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; nxt_port_mmap_header_t *hdr; @@ -3722,60 +3658,47 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); - pthread_mutex_lock(&lib->mutex); - - process = nxt_unit_process_find(lib, pid, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(process == NULL)) { - nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", - (int) pid, fd); - - return NXT_UNIT_ERROR; - } - - rc = NXT_UNIT_ERROR; - if (fstat(fd, &mmap_stat) == -1) { - nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, - strerror(errno), errno); + nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, + strerror(errno), errno); - goto fail; + return NXT_UNIT_ERROR; } mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", - strerror(errno), errno); + nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", + strerror(errno), errno); - goto fail; + return NXT_UNIT_ERROR; } hdr = mem; if (nxt_slow_path(hdr->src_pid != pid)) { - nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " - "detected: %d != %d or %d != %d", (int) hdr->src_pid, - (int) pid, (int) hdr->dst_pid, (int) lib->pid); + nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " + "detected: %d != %d or %d != %d", (int) hdr->src_pid, + (int) pid, (int) hdr->dst_pid, (int) lib->pid); munmap(mem, PORT_MMAP_SIZE); - goto fail; + return NXT_UNIT_ERROR; } nxt_queue_init(&awaiting_rbuf); - pthread_mutex_lock(&process->incoming.mutex); + pthread_mutex_lock(&lib->incoming.mutex); - mm = nxt_unit_mmap_at(&process->incoming, hdr->id); + mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); if (nxt_slow_path(mm == NULL)) { - nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); + nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); munmap(mem, PORT_MMAP_SIZE); + rc = NXT_UNIT_ERROR; + } else { mm->hdr = hdr; @@ -3787,7 +3710,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) rc = NXT_UNIT_OK; } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&lib->incoming.mutex); nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { @@ -3803,10 +3726,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) } nxt_queue_loop; -fail: - - nxt_unit_process_release(process); - return rc; } @@ -3840,9 +3759,6 @@ nxt_unit_process_release(nxt_unit_process_t *process) if (c == 1) { nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); - nxt_unit_mmaps_destroy(&process->incoming); - nxt_unit_mmaps_destroy(&process->outgoing); - free(process); } } @@ -3873,7 +3789,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, { int res; nxt_chunk_id_t c; - nxt_unit_process_t *process; + nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; nxt_port_mmap_tracking_msg_t *tracking_msg; @@ -3889,14 +3805,11 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, recv_msg->start = tracking_msg + 1; recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); - process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(process == NULL)) { - return NXT_UNIT_ERROR; - } + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&process->incoming.mutex); + pthread_mutex_lock(&lib->incoming.mutex); - res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming, + res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming, recv_msg->pid, tracking_msg->mmap_id, &hdr, rbuf); @@ -3919,7 +3832,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, res = NXT_UNIT_OK; } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&lib->incoming.mutex); return res; } @@ -3979,8 +3892,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, int res; void *start; uint32_t size; + nxt_unit_impl_t *lib; nxt_unit_mmaps_t *mmaps; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; nxt_port_mmap_header_t *hdr; @@ -3992,11 +3905,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, return NXT_UNIT_ERROR; } - process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(process == NULL)) { - return NXT_UNIT_ERROR; - } - mmap_msg = recv_msg->start; end = nxt_pointer_to(recv_msg->start, recv_msg->size); @@ -4023,7 +3931,9 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, b = recv_msg->incoming_buf; mmap_msg = recv_msg->start; - mmaps = &process->incoming; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + mmaps = &lib->incoming; pthread_mutex_lock(&mmaps->mutex); @@ -4052,7 +3962,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, b->buf.free = start; b->buf.end = b->buf.start + size; b->hdr = hdr; - b->process = process; b = b->next; @@ -4105,8 +4014,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) static void -nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, +nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, void *start, uint32_t size) { int freed_chunks; @@ -4132,12 +4040,10 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (hdr->src_pid == lib->pid && freed_chunks != 0) { - nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, - -freed_chunks); + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", - process->pid, - (int) process->outgoing.allocated_chunks); + nxt_unit_debug(ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); } if (hdr->dst_pid == lib->pid @@ -4241,9 +4147,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) nxt_queue_init(&process->ports); - nxt_unit_mmaps_init(&process->incoming); - nxt_unit_mmaps_init(&process->outgoing); - lhq.replace = 0; lhq.value = process; @@ -4255,8 +4158,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) default: nxt_unit_alert(NULL, "process %d insert failed", (int) pid); - pthread_mutex_destroy(&process->outgoing.mutex); - pthread_mutex_destroy(&process->incoming.mutex); free(process); process = NULL; break; @@ -4907,17 +4808,6 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) } -nxt_inline nxt_unit_process_t * -nxt_unit_port_process(nxt_unit_port_t *port) -{ - nxt_unit_port_impl_t *port_impl; - - port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); - - return port_impl->process; -} - - static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { |