summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:17 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:17 +0300
commit2f3d27fa22d2e5566dfdeddfb6a1f8c927a5c73d (patch)
tree4c85437f95778115e3f0039b7bf539563374df35 /src
parent83595606121a821f9e3cef0f0b7e7fe87eb1e50a (diff)
downloadunit-2f3d27fa22d2e5566dfdeddfb6a1f8c927a5c73d.tar.gz
unit-2f3d27fa22d2e5566dfdeddfb6a1f8c927a5c73d.tar.bz2
Process structures refactoring in runtime and libunit.
Generic process-to-process shared memory exchange is no more required. Here, it is transformed into a router-to-application pattern. The outgoing shared memory segments collection is now the property of the application structure. The applications connect to the router only, and the process only needs to group the ports.
Diffstat (limited to '')
-rw-r--r--src/nxt_process.c1
-rw-r--r--src/nxt_process.h1
-rw-r--r--src/nxt_runtime.c3
-rw-r--r--src/nxt_unit.c296
4 files changed, 93 insertions, 208 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 0b3aa40f..9bfae395 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -146,7 +146,6 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process)
}
nxt_port_mmaps_destroy(&p->incoming, 0);
- nxt_port_mmaps_destroy(&p->outgoing, 0);
} nxt_runtime_process_loop;
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 4076cefc..ecd813e2 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -92,7 +92,6 @@ typedef struct {
nxt_int_t use_count;
nxt_port_mmaps_t incoming;
- nxt_port_mmaps_t outgoing;
nxt_thread_mutex_t cp_mutex;
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index c25b93cc..5f4b3e58 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -1377,7 +1377,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
nxt_queue_init(&process->ports);
nxt_thread_mutex_create(&process->incoming.mutex);
- nxt_thread_mutex_create(&process->outgoing.mutex);
nxt_thread_mutex_create(&process->cp_mutex);
process->use_count = 1;
@@ -1397,10 +1396,8 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
nxt_assert(process->registered == 0);
nxt_port_mmaps_destroy(&process->incoming, 1);
- nxt_port_mmaps_destroy(&process->outgoing, 1);
nxt_thread_mutex_destroy(&process->incoming.mutex);
- nxt_thread_mutex_destroy(&process->outgoing.mutex);
nxt_thread_mutex_destroy(&process->cp_mutex);
/* processes from nxt_runtime_process_get() have no memory pool */
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 7fb2826d..154fd480 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -70,8 +70,6 @@ static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
-static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
@@ -114,7 +112,6 @@ static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
@@ -137,7 +134,6 @@ static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
-nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port);
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
@@ -179,7 +175,6 @@ struct nxt_unit_mmap_buf_s {
nxt_port_mmap_header_t *hdr;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_process_t *process;
char *free_ptr;
char *plain_ptr;
};
@@ -197,7 +192,6 @@ struct nxt_unit_recv_msg_s {
uint32_t size;
int fd;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *incoming_buf;
};
@@ -217,8 +211,6 @@ struct nxt_unit_request_info_impl_s {
uint32_t stream;
- nxt_unit_process_t *process;
-
nxt_unit_mmap_buf_t *outgoing_buf;
nxt_unit_mmap_buf_t *incoming_buf;
@@ -296,6 +288,23 @@ struct nxt_unit_ctx_impl_s {
};
+struct nxt_unit_mmap_s {
+ nxt_port_mmap_header_t *hdr;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t awaiting_rbuf;
+};
+
+
+struct nxt_unit_mmaps_s {
+ pthread_mutex_t mutex;
+ uint32_t size;
+ uint32_t cap;
+ nxt_atomic_t allocated_chunks;
+ nxt_unit_mmap_t *elts;
+};
+
+
struct nxt_unit_impl_s {
nxt_unit_t unit;
nxt_unit_callbacks_t callbacks;
@@ -315,6 +324,9 @@ struct nxt_unit_impl_s {
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
+ nxt_unit_mmaps_t incoming;
+ nxt_unit_mmaps_t outgoing;
+
pid_t pid;
int log_fd;
int online;
@@ -339,31 +351,11 @@ struct nxt_unit_port_impl_s {
};
-struct nxt_unit_mmap_s {
- nxt_port_mmap_header_t *hdr;
-
- /* of nxt_unit_read_buf_t */
- nxt_queue_t awaiting_rbuf;
-};
-
-
-struct nxt_unit_mmaps_s {
- pthread_mutex_t mutex;
- uint32_t size;
- uint32_t cap;
- nxt_atomic_t allocated_chunks;
- nxt_unit_mmap_t *elts;
-};
-
-
struct nxt_unit_process_s {
pid_t pid;
nxt_queue_t ports; /* of nxt_unit_port_impl_t */
- nxt_unit_mmaps_t incoming;
- nxt_unit_mmaps_t outgoing;
-
nxt_unit_impl_t *lib;
nxt_atomic_t use_count;
@@ -515,6 +507,9 @@ nxt_unit_create(nxt_unit_init_t *init)
goto fail;
}
+ nxt_unit_mmaps_init(&lib->incoming);
+ nxt_unit_mmaps_init(&lib->outgoing);
+
return lib;
fail:
@@ -640,6 +635,9 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_port_release(lib->shared_port);
}
+ nxt_unit_mmaps_destroy(&lib->incoming);
+ nxt_unit_mmaps_destroy(&lib->outgoing);
+
free(lib);
}
}
@@ -807,7 +805,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
rc = NXT_UNIT_ERROR;
recv_msg.fd = -1;
- recv_msg.process = NULL;
port_msg = (nxt_port_msg_t *) rbuf->buf;
cm = (struct cmsghdr *) rbuf->oob;
@@ -967,10 +964,6 @@ fail:
nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
}
- if (recv_msg.process != NULL) {
- nxt_unit_process_release(recv_msg.process);
- }
-
if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
#if (NXT_DEBUG)
memset(rbuf->buf, 0xAC, rbuf->size);
@@ -1109,14 +1102,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req->content_buf = req->request_buf;
req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
- /* "Move" process reference to req_impl. */
- req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(req_impl->process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
- recv_msg->process = NULL;
-
req_impl->stream = recv_msg->stream;
req_impl->outgoing_buf = NULL;
@@ -1174,6 +1159,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
nxt_unit_port_t *port;
+ nxt_unit_process_t *process;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_port_impl_t *port_impl;
nxt_unit_request_info_impl_t *req_impl;
@@ -1244,15 +1230,28 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
return NXT_UNIT_ERROR;
}
- req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ process = nxt_unit_process_find(lib, port_id->pid, 0);
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_alert(ctx, "check_response_port: process %d not found",
+ port->id.pid);
+
+ nxt_unit_port_hash_find(&lib->ports, port_id, 1);
- nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link);
+ pthread_mutex_unlock(&lib->mutex);
+
+ free(port);
+
+ return NXT_UNIT_ERROR;
+ }
- port_impl->process = req_impl->process;
+ nxt_queue_insert_tail(&process->ports, &port_impl->link);
+ port_impl->process = process;
nxt_queue_init(&port_impl->awaiting_req);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
port_impl->use_count = 2;
@@ -1262,8 +1261,6 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
pthread_mutex_unlock(&lib->mutex);
- nxt_unit_process_use(port_impl->process);
-
res = nxt_unit_get_port(ctx, port_id);
if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
@@ -1511,16 +1508,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->content_fd = -1;
}
- /*
- * Process release should go after buffers release to guarantee mmap
- * existence.
- */
- if (req_impl->process != NULL) {
- nxt_unit_process_release(req_impl->process);
-
- req_impl->process = NULL;
- }
-
if (req->response_port != NULL) {
nxt_unit_port_release(req->response_port);
@@ -2111,32 +2098,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
}
-static nxt_unit_process_t *
-nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
-{
- nxt_unit_impl_t *lib;
-
- if (recv_msg->process != NULL) {
- return recv_msg->process;
- }
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->mutex);
-
- recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (recv_msg->process == NULL) {
- nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
- recv_msg->stream, (int) recv_msg->pid);
- }
-
- return recv_msg->process;
-}
-
-
static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
@@ -2398,12 +2359,11 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
mmap_buf->hdr = NULL;
}
- nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
(int) m.mmap_msg.chunk_id - (int) first_free_chunk);
- nxt_unit_debug(req->ctx, "process %d allocated_chunks %d",
- mmap_buf->process->pid,
- (int) mmap_buf->process->outgoing.allocated_chunks);
+ nxt_unit_debug(req->ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
} else {
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
@@ -2463,7 +2423,6 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
{
if (mmap_buf->hdr != NULL) {
nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
- mmap_buf->process,
mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
@@ -2881,7 +2840,6 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
mmap_buf->buf.start = mmap_buf->free_ptr;
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
- mmap_buf->process = NULL;
res = read(req->content_fd, mmap_buf->free_ptr, size);
if (res < 0) {
@@ -3184,28 +3142,19 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
uint32_t outgoing_size;
nxt_unit_mmap_t *mm, *mm_end;
nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- process = nxt_unit_port_process(port);
- if (nxt_slow_path(process == NULL)) {
- nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed",
- (int) port->id.pid, (int) port->id.id);
-
- return NULL;
- }
-
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
retry:
- outgoing_size = process->outgoing.size;
+ outgoing_size = lib->outgoing.size;
- mm_end = process->outgoing.elts + outgoing_size;
+ mm_end = lib->outgoing.elts + outgoing_size;
- for (mm = process->outgoing.elts; mm < mm_end; mm++) {
+ for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) {
@@ -3252,13 +3201,13 @@ retry:
if (outgoing_size >= lib->shm_mmap_limit) {
/* Cannot allocate more shared memory. */
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
if (min_n == 0) {
*n = 0;
}
- if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
+ if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
>= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
{
/* Memory allocated by application, but not send to router. */
@@ -3287,7 +3236,7 @@ retry:
nxt_unit_debug(ctx, "oosm: retry");
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
goto retry;
}
@@ -3297,13 +3246,12 @@ retry:
unlock:
- nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
- process->pid,
- (int) process->outgoing.allocated_chunks);
+ nxt_unit_debug(ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
return hdr;
}
@@ -3448,20 +3396,11 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
char name[64];
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
- process = nxt_unit_port_process(port);
- if (nxt_slow_path(process == NULL)) {
- nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed",
- (int) port->id.pid, (int) port->id.id);
-
- return NULL;
- }
-
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
+ mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
if (nxt_slow_path(mm == NULL)) {
nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
@@ -3538,9 +3477,9 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
- hdr->id = process->outgoing.size - 1;
+ hdr->id = lib->outgoing.size - 1;
hdr->src_pid = lib->pid;
- hdr->dst_pid = process->pid;
+ hdr->dst_pid = port->id.pid;
hdr->sent_over = port->id.id;
/* Mark first n chunk(s) as busy */
@@ -3552,7 +3491,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
rc = nxt_unit_send_mmap(ctx, port, fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -3561,12 +3500,12 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
} else {
nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
- hdr->id, (int) lib->pid, (int) process->pid);
+ hdr->id, (int) lib->pid, (int) port->id.pid);
}
close(fd);
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
if (nxt_fast_path(hdr != NULL)) {
return hdr;
@@ -3574,7 +3513,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
remove_fail:
- process->outgoing.size--;
+ lib->outgoing.size--;
return NULL;
}
@@ -3662,7 +3601,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
- mmap_buf->process = nxt_unit_port_process(port);
nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
mmap_buf->buf.start, (int) size);
@@ -3692,7 +3630,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
- mmap_buf->process = nxt_unit_port_process(port);
mmap_buf->free_ptr = NULL;
mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
@@ -3713,7 +3650,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
struct stat mmap_stat;
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
nxt_port_mmap_header_t *hdr;
@@ -3722,60 +3658,47 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
- pthread_mutex_lock(&lib->mutex);
-
- process = nxt_unit_process_find(lib, pid, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (nxt_slow_path(process == NULL)) {
- nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
- (int) pid, fd);
-
- return NXT_UNIT_ERROR;
- }
-
- rc = NXT_UNIT_ERROR;
-
if (fstat(fd, &mmap_stat) == -1) {
- nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
- strerror(errno), errno);
+ nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
- goto fail;
+ return NXT_UNIT_ERROR;
}
mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
- nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
- strerror(errno), errno);
+ nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
+ strerror(errno), errno);
- goto fail;
+ return NXT_UNIT_ERROR;
}
hdr = mem;
if (nxt_slow_path(hdr->src_pid != pid)) {
- nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
- "detected: %d != %d or %d != %d", (int) hdr->src_pid,
- (int) pid, (int) hdr->dst_pid, (int) lib->pid);
+ nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
+ "detected: %d != %d or %d != %d", (int) hdr->src_pid,
+ (int) pid, (int) hdr->dst_pid, (int) lib->pid);
munmap(mem, PORT_MMAP_SIZE);
- goto fail;
+ return NXT_UNIT_ERROR;
}
nxt_queue_init(&awaiting_rbuf);
- pthread_mutex_lock(&process->incoming.mutex);
+ pthread_mutex_lock(&lib->incoming.mutex);
- mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
+ mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
if (nxt_slow_path(mm == NULL)) {
- nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
+ nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
munmap(mem, PORT_MMAP_SIZE);
+ rc = NXT_UNIT_ERROR;
+
} else {
mm->hdr = hdr;
@@ -3787,7 +3710,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
rc = NXT_UNIT_OK;
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&lib->incoming.mutex);
nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
@@ -3803,10 +3726,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
} nxt_queue_loop;
-fail:
-
- nxt_unit_process_release(process);
-
return rc;
}
@@ -3840,9 +3759,6 @@ nxt_unit_process_release(nxt_unit_process_t *process)
if (c == 1) {
nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
- nxt_unit_mmaps_destroy(&process->incoming);
- nxt_unit_mmaps_destroy(&process->outgoing);
-
free(process);
}
}
@@ -3873,7 +3789,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
{
int res;
nxt_chunk_id_t c;
- nxt_unit_process_t *process;
+ nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_tracking_msg_t *tracking_msg;
@@ -3889,14 +3805,11 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
recv_msg->start = tracking_msg + 1;
recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
- process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NXT_UNIT_ERROR;
- }
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- pthread_mutex_lock(&process->incoming.mutex);
+ pthread_mutex_lock(&lib->incoming.mutex);
- res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming,
+ res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming,
recv_msg->pid, tracking_msg->mmap_id,
&hdr, rbuf);
@@ -3919,7 +3832,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
res = NXT_UNIT_OK;
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&lib->incoming.mutex);
return res;
}
@@ -3979,8 +3892,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
int res;
void *start;
uint32_t size;
+ nxt_unit_impl_t *lib;
nxt_unit_mmaps_t *mmaps;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
nxt_port_mmap_header_t *hdr;
@@ -3992,11 +3905,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
return NXT_UNIT_ERROR;
}
- process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
mmap_msg = recv_msg->start;
end = nxt_pointer_to(recv_msg->start, recv_msg->size);
@@ -4023,7 +3931,9 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
b = recv_msg->incoming_buf;
mmap_msg = recv_msg->start;
- mmaps = &process->incoming;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ mmaps = &lib->incoming;
pthread_mutex_lock(&mmaps->mutex);
@@ -4052,7 +3962,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
- b->process = process;
b = b->next;
@@ -4105,8 +4014,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
static void
-nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
+nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
void *start, uint32_t size)
{
int freed_chunks;
@@ -4132,12 +4040,10 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (hdr->src_pid == lib->pid && freed_chunks != 0) {
- nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
- -freed_chunks);
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
- process->pid,
- (int) process->outgoing.allocated_chunks);
+ nxt_unit_debug(ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
}
if (hdr->dst_pid == lib->pid
@@ -4241,9 +4147,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
nxt_queue_init(&process->ports);
- nxt_unit_mmaps_init(&process->incoming);
- nxt_unit_mmaps_init(&process->outgoing);
-
lhq.replace = 0;
lhq.value = process;
@@ -4255,8 +4158,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
default:
nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
- pthread_mutex_destroy(&process->outgoing.mutex);
- pthread_mutex_destroy(&process->incoming.mutex);
free(process);
process = NULL;
break;
@@ -4907,17 +4808,6 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
}
-nxt_inline nxt_unit_process_t *
-nxt_unit_port_process(nxt_unit_port_t *port)
-{
- nxt_unit_port_impl_t *port_impl;
-
- port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
-
- return port_impl->process;
-}
-
-
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{