summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_process.c1
-rw-r--r--src/nxt_process.h1
-rw-r--r--src/nxt_runtime.c3
-rw-r--r--src/nxt_unit.c296
4 files changed, 93 insertions, 208 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 0b3aa40f..9bfae395 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -146,7 +146,6 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process)
}
nxt_port_mmaps_destroy(&p->incoming, 0);
- nxt_port_mmaps_destroy(&p->outgoing, 0);
} nxt_runtime_process_loop;
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 4076cefc..ecd813e2 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -92,7 +92,6 @@ typedef struct {
nxt_int_t use_count;
nxt_port_mmaps_t incoming;
- nxt_port_mmaps_t outgoing;
nxt_thread_mutex_t cp_mutex;
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index c25b93cc..5f4b3e58 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -1377,7 +1377,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
nxt_queue_init(&process->ports);
nxt_thread_mutex_create(&process->incoming.mutex);
- nxt_thread_mutex_create(&process->outgoing.mutex);
nxt_thread_mutex_create(&process->cp_mutex);
process->use_count = 1;
@@ -1397,10 +1396,8 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
nxt_assert(process->registered == 0);
nxt_port_mmaps_destroy(&process->incoming, 1);
- nxt_port_mmaps_destroy(&process->outgoing, 1);
nxt_thread_mutex_destroy(&process->incoming.mutex);
- nxt_thread_mutex_destroy(&process->outgoing.mutex);
nxt_thread_mutex_destroy(&process->cp_mutex);
/* processes from nxt_runtime_process_get() have no memory pool */
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 7fb2826d..154fd480 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -70,8 +70,6 @@ static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
-static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
@@ -114,7 +112,6 @@ static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
@@ -137,7 +134,6 @@ static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
-nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port);
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
@@ -179,7 +175,6 @@ struct nxt_unit_mmap_buf_s {
nxt_port_mmap_header_t *hdr;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_process_t *process;
char *free_ptr;
char *plain_ptr;
};
@@ -197,7 +192,6 @@ struct nxt_unit_recv_msg_s {
uint32_t size;
int fd;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *incoming_buf;
};
@@ -217,8 +211,6 @@ struct nxt_unit_request_info_impl_s {
uint32_t stream;
- nxt_unit_process_t *process;
-
nxt_unit_mmap_buf_t *outgoing_buf;
nxt_unit_mmap_buf_t *incoming_buf;
@@ -296,6 +288,23 @@ struct nxt_unit_ctx_impl_s {
};
+struct nxt_unit_mmap_s {
+ nxt_port_mmap_header_t *hdr;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t awaiting_rbuf;
+};
+
+
+struct nxt_unit_mmaps_s {
+ pthread_mutex_t mutex;
+ uint32_t size;
+ uint32_t cap;
+ nxt_atomic_t allocated_chunks;
+ nxt_unit_mmap_t *elts;
+};
+
+
struct nxt_unit_impl_s {
nxt_unit_t unit;
nxt_unit_callbacks_t callbacks;
@@ -315,6 +324,9 @@ struct nxt_unit_impl_s {
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
+ nxt_unit_mmaps_t incoming;
+ nxt_unit_mmaps_t outgoing;
+
pid_t pid;
int log_fd;
int online;
@@ -339,31 +351,11 @@ struct nxt_unit_port_impl_s {
};
-struct nxt_unit_mmap_s {
- nxt_port_mmap_header_t *hdr;
-
- /* of nxt_unit_read_buf_t */
- nxt_queue_t awaiting_rbuf;
-};
-
-
-struct nxt_unit_mmaps_s {
- pthread_mutex_t mutex;
- uint32_t size;
- uint32_t cap;
- nxt_atomic_t allocated_chunks;
- nxt_unit_mmap_t *elts;
-};
-
-
struct nxt_unit_process_s {
pid_t pid;
nxt_queue_t ports; /* of nxt_unit_port_impl_t */
- nxt_unit_mmaps_t incoming;
- nxt_unit_mmaps_t outgoing;
-
nxt_unit_impl_t *lib;
nxt_atomic_t use_count;
@@ -515,6 +507,9 @@ nxt_unit_create(nxt_unit_init_t *init)
goto fail;
}
+ nxt_unit_mmaps_init(&lib->incoming);
+ nxt_unit_mmaps_init(&lib->outgoing);
+
return lib;
fail:
@@ -640,6 +635,9 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_port_release(lib->shared_port);
}
+ nxt_unit_mmaps_destroy(&lib->incoming);
+ nxt_unit_mmaps_destroy(&lib->outgoing);
+
free(lib);
}
}
@@ -807,7 +805,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
rc = NXT_UNIT_ERROR;
recv_msg.fd = -1;
- recv_msg.process = NULL;
port_msg = (nxt_port_msg_t *) rbuf->buf;
cm = (struct cmsghdr *) rbuf->oob;
@@ -967,10 +964,6 @@ fail:
nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
}
- if (recv_msg.process != NULL) {
- nxt_unit_process_release(recv_msg.process);
- }
-
if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
#if (NXT_DEBUG)
memset(rbuf->buf, 0xAC, rbuf->size);
@@ -1109,14 +1102,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req->content_buf = req->request_buf;
req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
- /* "Move" process reference to req_impl. */
- req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(req_impl->process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
- recv_msg->process = NULL;
-
req_impl->stream = recv_msg->stream;
req_impl->outgoing_buf = NULL;
@@ -1174,6 +1159,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
nxt_unit_port_t *port;
+ nxt_unit_process_t *process;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_port_impl_t *port_impl;
nxt_unit_request_info_impl_t *req_impl;
@@ -1244,15 +1230,28 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
return NXT_UNIT_ERROR;
}
- req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ process = nxt_unit_process_find(lib, port_id->pid, 0);
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_alert(ctx, "check_response_port: process %d not found",
+ port->id.pid);
+
+ nxt_unit_port_hash_find(&lib->ports, port_id, 1);
- nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link);
+ pthread_mutex_unlock(&lib->mutex);
+
+ free(port);
+
+ return NXT_UNIT_ERROR;
+ }
- port_impl->process = req_impl->process;
+ nxt_queue_insert_tail(&process->ports, &port_impl->link);
+ port_impl->process = process;
nxt_queue_init(&port_impl->awaiting_req);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
port_impl->use_count = 2;
@@ -1262,8 +1261,6 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
pthread_mutex_unlock(&lib->mutex);
- nxt_unit_process_use(port_impl->process);
-
res = nxt_unit_get_port(ctx, port_id);
if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
@@ -1511,16 +1508,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->content_fd = -1;
}
- /*
- * Process release should go after buffers release to guarantee mmap
- * existence.
- */
- if (req_impl->process != NULL) {
- nxt_unit_process_release(req_impl->process);
-
- req_impl->process = NULL;
- }
-
if (req->response_port != NULL) {
nxt_unit_port_release(req->response_port);
@@ -2111,32 +2098,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
}
-static nxt_unit_process_t *
-nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
-{
- nxt_unit_impl_t *lib;
-
- if (recv_msg->process != NULL) {
- return recv_msg->process;
- }
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->mutex);
-
- recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (recv_msg->process == NULL) {
- nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
- recv_msg->stream, (int) recv_msg->pid);
- }
-
- return recv_msg->process;
-}
-
-
static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
@@ -2398,12 +2359,11 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
mmap_buf->hdr = NULL;
}
- nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
(int) m.mmap_msg.chunk_id - (int) first_free_chunk);
- nxt_unit_debug(req->ctx, "process %d allocated_chunks %d",
- mmap_buf->process->pid,
- (int) mmap_buf->process->outgoing.allocated_chunks);
+ nxt_unit_debug(req->ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
} else {
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
@@ -2463,7 +2423,6 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
{
if (mmap_buf->hdr != NULL) {
nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
- mmap_buf->process,
mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
@@ -2881,7 +2840,6 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
mmap_buf->buf.start = mmap_buf->free_ptr;
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
- mmap_buf->process = NULL;
res = read(req->content_fd, mmap_buf->free_ptr, size);
if (res < 0) {
@@ -3184,28 +3142,19 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
uint32_t outgoing_size;
nxt_unit_mmap_t *mm, *mm_end;
nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- process = nxt_unit_port_process(port);
- if (nxt_slow_path(process == NULL)) {
- nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed",
- (int) port->id.pid, (int) port->id.id);
-
- return NULL;
- }
-
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
retry:
- outgoing_size = process->outgoing.size;
+ outgoing_size = lib->outgoing.size;
- mm_end = process->outgoing.elts + outgoing_size;
+ mm_end = lib->outgoing.elts + outgoing_size;
- for (mm = process->outgoing.elts; mm < mm_end; mm++) {
+ for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) {
@@ -3252,13 +3201,13 @@ retry:
if (outgoing_size >= lib->shm_mmap_limit) {
/* Cannot allocate more shared memory. */
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
if (min_n == 0) {
*n = 0;
}
- if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
+ if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
>= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
{
/* Memory allocated by application, but not send to router. */
@@ -3287,7 +3236,7 @@ retry:
nxt_unit_debug(ctx, "oosm: retry");
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
goto retry;
}
@@ -3297,13 +3246,12 @@ retry:
unlock:
- nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
- process->pid,
- (int) process->outgoing.allocated_chunks);
+ nxt_unit_debug(ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
return hdr;
}
@@ -3448,20 +3396,11 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
char name[64];
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
- process = nxt_unit_port_process(port);
- if (nxt_slow_path(process == NULL)) {
- nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed",
- (int) port->id.pid, (int) port->id.id);
-
- return NULL;
- }
-
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
+ mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
if (nxt_slow_path(mm == NULL)) {
nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
@@ -3538,9 +3477,9 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
- hdr->id = process->outgoing.size - 1;
+ hdr->id = lib->outgoing.size - 1;
hdr->src_pid = lib->pid;
- hdr->dst_pid = process->pid;
+ hdr->dst_pid = port->id.pid;
hdr->sent_over = port->id.id;
/* Mark first n chunk(s) as busy */
@@ -3552,7 +3491,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
rc = nxt_unit_send_mmap(ctx, port, fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -3561,12 +3500,12 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
} else {
nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
- hdr->id, (int) lib->pid, (int) process->pid);
+ hdr->id, (int) lib->pid, (int) port->id.pid);
}
close(fd);
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
if (nxt_fast_path(hdr != NULL)) {
return hdr;
@@ -3574,7 +3513,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
remove_fail:
- process->outgoing.size--;
+ lib->outgoing.size--;
return NULL;
}
@@ -3662,7 +3601,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
- mmap_buf->process = nxt_unit_port_process(port);
nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
mmap_buf->buf.start, (int) size);
@@ -3692,7 +3630,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
- mmap_buf->process = nxt_unit_port_process(port);
mmap_buf->free_ptr = NULL;
mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
@@ -3713,7 +3650,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
struct stat mmap_stat;
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
nxt_port_mmap_header_t *hdr;
@@ -3722,60 +3658,47 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
- pthread_mutex_lock(&lib->mutex);
-
- process = nxt_unit_process_find(lib, pid, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (nxt_slow_path(process == NULL)) {
- nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
- (int) pid, fd);
-
- return NXT_UNIT_ERROR;
- }
-
- rc = NXT_UNIT_ERROR;
-
if (fstat(fd, &mmap_stat) == -1) {
- nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
- strerror(errno), errno);
+ nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
- goto fail;
+ return NXT_UNIT_ERROR;
}
mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
- nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
- strerror(errno), errno);
+ nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
+ strerror(errno), errno);
- goto fail;
+ return NXT_UNIT_ERROR;
}
hdr = mem;
if (nxt_slow_path(hdr->src_pid != pid)) {
- nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
- "detected: %d != %d or %d != %d", (int) hdr->src_pid,
- (int) pid, (int) hdr->dst_pid, (int) lib->pid);
+ nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
+ "detected: %d != %d or %d != %d", (int) hdr->src_pid,
+ (int) pid, (int) hdr->dst_pid, (int) lib->pid);
munmap(mem, PORT_MMAP_SIZE);
- goto fail;
+ return NXT_UNIT_ERROR;
}
nxt_queue_init(&awaiting_rbuf);
- pthread_mutex_lock(&process->incoming.mutex);
+ pthread_mutex_lock(&lib->incoming.mutex);
- mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
+ mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
if (nxt_slow_path(mm == NULL)) {
- nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
+ nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
munmap(mem, PORT_MMAP_SIZE);
+ rc = NXT_UNIT_ERROR;
+
} else {
mm->hdr = hdr;
@@ -3787,7 +3710,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
rc = NXT_UNIT_OK;
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&lib->incoming.mutex);
nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
@@ -3803,10 +3726,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
} nxt_queue_loop;
-fail:
-
- nxt_unit_process_release(process);
-
return rc;
}
@@ -3840,9 +3759,6 @@ nxt_unit_process_release(nxt_unit_process_t *process)
if (c == 1) {
nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
- nxt_unit_mmaps_destroy(&process->incoming);
- nxt_unit_mmaps_destroy(&process->outgoing);
-
free(process);
}
}
@@ -3873,7 +3789,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
{
int res;
nxt_chunk_id_t c;
- nxt_unit_process_t *process;
+ nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_tracking_msg_t *tracking_msg;
@@ -3889,14 +3805,11 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
recv_msg->start = tracking_msg + 1;
recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
- process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NXT_UNIT_ERROR;
- }
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- pthread_mutex_lock(&process->incoming.mutex);
+ pthread_mutex_lock(&lib->incoming.mutex);
- res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming,
+ res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming,
recv_msg->pid, tracking_msg->mmap_id,
&hdr, rbuf);
@@ -3919,7 +3832,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
res = NXT_UNIT_OK;
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&lib->incoming.mutex);
return res;
}
@@ -3979,8 +3892,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
int res;
void *start;
uint32_t size;
+ nxt_unit_impl_t *lib;
nxt_unit_mmaps_t *mmaps;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
nxt_port_mmap_header_t *hdr;
@@ -3992,11 +3905,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
return NXT_UNIT_ERROR;
}
- process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
mmap_msg = recv_msg->start;
end = nxt_pointer_to(recv_msg->start, recv_msg->size);
@@ -4023,7 +3931,9 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
b = recv_msg->incoming_buf;
mmap_msg = recv_msg->start;
- mmaps = &process->incoming;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ mmaps = &lib->incoming;
pthread_mutex_lock(&mmaps->mutex);
@@ -4052,7 +3962,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
- b->process = process;
b = b->next;
@@ -4105,8 +4014,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
static void
-nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
+nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
void *start, uint32_t size)
{
int freed_chunks;
@@ -4132,12 +4040,10 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (hdr->src_pid == lib->pid && freed_chunks != 0) {
- nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
- -freed_chunks);
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
- process->pid,
- (int) process->outgoing.allocated_chunks);
+ nxt_unit_debug(ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
}
if (hdr->dst_pid == lib->pid
@@ -4241,9 +4147,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
nxt_queue_init(&process->ports);
- nxt_unit_mmaps_init(&process->incoming);
- nxt_unit_mmaps_init(&process->outgoing);
-
lhq.replace = 0;
lhq.value = process;
@@ -4255,8 +4158,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
default:
nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
- pthread_mutex_destroy(&process->outgoing.mutex);
- pthread_mutex_destroy(&process->incoming.mutex);
free(process);
process = NULL;
break;
@@ -4907,17 +4808,6 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
}
-nxt_inline nxt_unit_process_t *
-nxt_unit_port_process(nxt_unit_port_t *port)
-{
- nxt_unit_port_impl_t *port_impl;
-
- port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
-
- return port_impl->process;
-}
-
-
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{