summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:06 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:06 +0300
commitbf647588ff781e606651f001b53a4e83bb34c000 (patch)
tree6113737bf5da1b6cc53e333deba3114c6b43ec0d
parentec3389b63bd7a9159d2be4a2863140f75095c7d3 (diff)
downloadunit-bf647588ff781e606651f001b53a4e83bb34c000.tar.gz
unit-bf647588ff781e606651f001b53a4e83bb34c000.tar.bz2
Adding a reference counter to the libunit port structure.
The goal is to minimize the number of (pid, id) to port hash lookups which require a library mutex lock. The response port is found once per request, while the read port is initialized at startup.
Diffstat (limited to '')
-rw-r--r--go/nxt_cgo_lib.c12
-rw-r--r--src/nxt_unit.c678
-rw-r--r--src/nxt_unit.h13
3 files changed, 346 insertions, 357 deletions
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c
index 1bb38f3c..937996b0 100644
--- a/go/nxt_cgo_lib.c
+++ b/go/nxt_cgo_lib.c
@@ -15,9 +15,9 @@ static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst,
nxt_unit_sptr_t *sptr, uint32_t length);
static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port);
-static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
-static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size);
static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx);
@@ -123,19 +123,19 @@ nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
static ssize_t
-nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
- return nxt_go_port_send(port_id->pid, port_id->id,
+ return nxt_go_port_send(port->id.pid, port->id.id,
(void *) buf, buf_size, (void *) oob, oob_size);
}
static ssize_t
-nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
- return nxt_go_port_recv(port_id->pid, port_id->id,
+ return nxt_go_port_recv(port->id.pid, port->id.id,
buf, buf_size, oob, oob_size);
}
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 8c964c7a..ddfd9c80 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -70,7 +70,7 @@ static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
-static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
+static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
@@ -84,17 +84,16 @@ static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
- nxt_chunk_id_t *c, int *n, int min_n);
-static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
+ nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
+static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
-static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ nxt_unit_port_t *port, int n);
+static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
+ nxt_unit_port_t *port, uint32_t size,
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
@@ -121,34 +120,36 @@ static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
-static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, int *fd);
+static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
-static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *new_port, int fd);
+static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
+ nxt_unit_port_t *port);
-static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
-static int nxt_unit_remove_port(nxt_unit_impl_t *lib,
+nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
+nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
+nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port);
+static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port);
+static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
+ nxt_unit_port_id_t *port_id);
+static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
nxt_unit_port_id_t *port_id);
-static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
- nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port,
- nxt_unit_process_t **process);
static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
nxt_unit_process_t *process);
static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
+ nxt_unit_port_t *port, const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
-static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
+static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port, void *buf, size_t buf_size,
void *oob, size_t oob_size);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
-static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
+static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
nxt_unit_port_id_t *port_id, int remove);
static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
@@ -166,7 +167,6 @@ struct nxt_unit_mmap_buf_s {
nxt_unit_mmap_buf_t **prev;
nxt_port_mmap_header_t *hdr;
- nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_process_t *process;
@@ -247,8 +247,7 @@ struct nxt_unit_ctx_impl_s {
pthread_mutex_t mutex;
- nxt_unit_port_id_t read_port_id;
- int read_port_fd;
+ nxt_unit_port_t *read_port;
nxt_queue_link_t link;
@@ -291,7 +290,7 @@ struct nxt_unit_impl_s {
nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
- nxt_unit_port_id_t router_port_id;
+ nxt_unit_port_t *router_port;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
@@ -306,6 +305,8 @@ struct nxt_unit_impl_s {
struct nxt_unit_port_impl_s {
nxt_unit_port_t port;
+ nxt_atomic_t use_count;
+
nxt_queue_link_t link;
nxt_unit_process_t *process;
};
@@ -395,26 +396,23 @@ nxt_unit_init(nxt_unit_init_t *init)
}
lib->pid = read_port.id.pid;
+
ctx = &lib->main_ctx.ctx;
- rc = nxt_unit_add_port(ctx, &router_port);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ lib->router_port = nxt_unit_add_port(ctx, &router_port);
+ if (nxt_slow_path(lib->router_port == NULL)) {
nxt_unit_alert(NULL, "failed to add router_port");
goto fail;
}
- lib->router_port_id = router_port.id;
-
- rc = nxt_unit_add_port(ctx, &read_port);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port);
+ if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
nxt_unit_alert(NULL, "failed to add read_port");
goto fail;
}
- lib->main_ctx.read_port_id = read_port.id;
-
rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
@@ -428,7 +426,7 @@ nxt_unit_init(nxt_unit_init_t *init)
fail:
- free(lib);
+ nxt_unit_ctx_release(&lib->main_ctx);
return NULL;
}
@@ -471,6 +469,7 @@ nxt_unit_create(nxt_unit_init_t *init)
nxt_queue_init(&lib->contexts);
lib->use_count = 0;
+ lib->router_port = NULL;
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -485,10 +484,6 @@ nxt_unit_create(nxt_unit_init_t *init)
goto fail;
}
- if (cb->port_recv == NULL) {
- cb->port_recv = nxt_unit_port_recv_default;
- }
-
return lib;
fail:
@@ -539,7 +534,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
- ctx_impl->read_port_fd = -1;
+ ctx_impl->read_port = NULL;
ctx_impl->requests.slot = 0;
return NXT_UNIT_OK;
@@ -597,6 +592,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
pthread_mutex_destroy(&lib->mutex);
+ if (nxt_fast_path(lib->router_port != NULL)) {
+ nxt_unit_port_release(lib->router_port);
+ }
+
free(lib);
}
}
@@ -751,7 +750,7 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
int rc;
@@ -917,7 +916,7 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
- nxt_unit_port_t new_port;
+ nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
@@ -960,7 +959,14 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->fd = -1;
- return nxt_unit_add_port(ctx, &new_port);
+ port = nxt_unit_add_port(ctx, &new_port);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_port_release(port);
+
+ return NXT_UNIT_OK;
}
@@ -968,6 +974,8 @@ static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_port_id_t port_id;
nxt_unit_request_t *r;
nxt_unit_mmap_buf_t *b;
nxt_unit_request_info_t *req;
@@ -996,10 +1004,27 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
+ nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (nxt_slow_path(port == NULL)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found",
+ recv_msg->stream,
+ (int) recv_msg->pid, (int) recv_msg->reply_port);
+
+ return NXT_UNIT_ERROR;
+ }
+
req = &req_impl->req;
- nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
- recv_msg->reply_port);
+ req->response_port = port;
req->request = recv_msg->start;
@@ -1051,8 +1076,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(char *) nxt_unit_sptr_get(&r->target),
(int) r->content_length);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
lib->callbacks.request_handler(req);
return NXT_UNIT_OK;
@@ -1275,6 +1298,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req_impl->process = NULL;
}
+ if (req->response_port != NULL) {
+ nxt_unit_port_release(req->response_port);
+
+ req->response_port = NULL;
+ }
+
pthread_mutex_lock(&ctx_impl->mutex);
nxt_queue_remove(&req_impl->link);
@@ -1793,7 +1822,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
if (nxt_fast_path(rc == NXT_UNIT_OK)) {
req->response = NULL;
req->response_buf = NULL;
@@ -1846,8 +1875,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port, size, size, mmap_buf,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
+ size, size, mmap_buf,
NULL);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -2035,7 +2064,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
}
if (nxt_fast_path(buf->free > buf->start)) {
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -2050,17 +2079,15 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
static void
nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
{
- int rc;
- nxt_unit_mmap_buf_t *mmap_buf;
- nxt_unit_request_info_t *req;
- nxt_unit_request_info_impl_t *req_impl;
+ int rc;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_t *req;
mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
req = mmap_buf->req;
- req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
+ rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
if (nxt_slow_path(rc == NXT_UNIT_OK)) {
nxt_unit_mmap_buf_free(mmap_buf);
@@ -2073,7 +2100,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
static int
-nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
+nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
nxt_unit_mmap_buf_t *mmap_buf, int last)
{
struct {
@@ -2081,22 +2108,24 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_port_mmap_msg_t mmap_msg;
} m;
- int rc;
- u_char *last_used, *first_free;
- ssize_t res;
- nxt_chunk_id_t first_free_chunk;
- nxt_unit_buf_t *buf;
- nxt_unit_impl_t *lib;
- nxt_port_mmap_header_t *hdr;
+ int rc;
+ u_char *last_used, *first_free;
+ ssize_t res;
+ nxt_chunk_id_t first_free_chunk;
+ nxt_unit_buf_t *buf;
+ nxt_unit_impl_t *lib;
+ nxt_port_mmap_header_t *hdr;
+ nxt_unit_request_info_impl_t *req_impl;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
buf = &mmap_buf->buf;
hdr = mmap_buf->hdr;
m.mmap_msg.size = buf->free - buf->start;
- m.msg.stream = stream;
+ m.msg.stream = req_impl->stream;
m.msg.pid = lib->pid;
m.msg.reply_port = 0;
m.msg.type = _NXT_PORT_MSG_DATA;
@@ -2113,13 +2142,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
(u_char *) buf->start);
- nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
- stream,
+ nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
+ req_impl->stream,
(int) m.mmap_msg.mmap_id,
(int) m.mmap_msg.chunk_id,
(int) m.mmap_msg.size);
- res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
+ res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
goto free_buf;
@@ -2149,7 +2178,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
(int) m.mmap_msg.chunk_id - (int) first_free_chunk);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
+ nxt_unit_debug(req->ctx, "process %d allocated_chunks %d",
mmap_buf->process->pid,
(int) mmap_buf->process->outgoing.allocated_chunks);
@@ -2157,19 +2186,21 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
|| mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
{
- nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
- ": no space reserved for message header", stream);
+ nxt_unit_alert(req->ctx,
+ "#%"PRIu32": failed to send plain memory buffer"
+ ": no space reserved for message header",
+ req_impl->stream);
goto free_buf;
}
memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
- nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
- stream,
+ nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
+ req_impl->stream,
(int) (sizeof(m.msg) + m.mmap_msg.size));
- res = nxt_unit_port_send(ctx, &mmap_buf->port_id,
+ res = nxt_unit_port_send(req->ctx, req->response_port,
buf->start - sizeof(m.msg),
m.mmap_msg.size + sizeof(m.msg),
NULL, 0);
@@ -2337,7 +2368,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
sent = 0;
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
- nxt_unit_req_warn(req, "write: response not initialized yet");
+ nxt_unit_req_alert(req, "write: response not initialized yet");
return -NXT_UNIT_ERROR;
}
@@ -2369,8 +2400,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
min_part_size = nxt_min(min_size, part_size);
min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port, part_size,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
min_part_size, &mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
@@ -2385,7 +2415,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
}
@@ -2415,8 +2445,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_alert(req, "write: response not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
/* Check if response is not send yet. */
- if (nxt_slow_path(req->response_buf)) {
+ if (nxt_slow_path(req->response_buf != NULL)) {
/* Enable content in headers buf. */
rc = nxt_unit_response_add_content(req, "", 0);
@@ -2463,8 +2499,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
buf_size, buf_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -2486,7 +2521,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
buf->free += n;
}
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send content");
@@ -2744,7 +2779,7 @@ skip_response_send:
msg.mf = 0;
msg.tracking = 0;
- (void) nxt_unit_port_send(req->ctx, &req->response_port,
+ (void) nxt_unit_port_send(req->ctx, req->response_port,
&msg, sizeof(msg), NULL, 0);
nxt_unit_request_info_release(req);
@@ -2765,17 +2800,14 @@ int
nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
uint8_t last, const struct iovec *iov, int iovcnt)
{
- int i, rc;
- size_t l, copy;
- uint32_t payload_len, buf_size, alloc_size;
- const uint8_t *b;
- nxt_unit_buf_t *buf;
- nxt_unit_mmap_buf_t mmap_buf;
- nxt_websocket_header_t *wh;
- nxt_unit_request_info_impl_t *req_impl;
- char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
-
- req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ int i, rc;
+ size_t l, copy;
+ uint32_t payload_len, buf_size, alloc_size;
+ const uint8_t *b;
+ nxt_unit_buf_t *buf;
+ nxt_unit_mmap_buf_t mmap_buf;
+ nxt_websocket_header_t *wh;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
payload_len = 0;
@@ -2786,8 +2818,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
buf_size = 10 + payload_len;
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -2821,8 +2852,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
if (l > 0) {
if (nxt_fast_path(buf->free > buf->start)) {
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
- &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2831,8 +2861,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -2845,8 +2874,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
}
if (buf->free > buf->start) {
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
- &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
}
return rc;
@@ -2919,17 +2947,26 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
static nxt_port_mmap_header_t *
-nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
+nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_chunk_id_t *c, int *n, int min_n)
{
int res, nchunks, i;
uint32_t outgoing_size;
nxt_unit_mmap_t *mm, *mm_end;
nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ process = nxt_unit_port_process(port);
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed",
+ (int) port->id.pid, (int) port->id.id);
+
+ return NULL;
+ }
+
pthread_mutex_lock(&process->outgoing.mutex);
retry:
@@ -2941,7 +2978,7 @@ retry:
for (mm = process->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
- if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
+ if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) {
continue;
}
@@ -3000,7 +3037,7 @@ retry:
/* Notify router about OOSM condition. */
- res = nxt_unit_send_oosm(ctx, port_id);
+ res = nxt_unit_send_oosm(ctx, port);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
return NULL;
}
@@ -3026,7 +3063,7 @@ retry:
}
*c = 0;
- hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
+ hdr = nxt_unit_new_mmap(ctx, port, *n);
unlock:
@@ -3043,7 +3080,7 @@ unlock:
static int
-nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
ssize_t res;
nxt_port_msg_t msg;
@@ -3061,7 +3098,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
msg.mf = 0;
msg.tracking = 0;
- res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3163,21 +3200,29 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
static nxt_port_mmap_header_t *
-nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, int n)
+nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
{
int i, fd, rc;
void *mem;
char name[64];
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
- lib = process->lib;
+ process = nxt_unit_port_process(port);
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed",
+ (int) port->id.pid, (int) port->id.id);
+
+ return NULL;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
if (nxt_slow_path(mm == NULL)) {
- nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
+ nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
return NULL;
}
@@ -3255,7 +3300,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
hdr->id = process->outgoing.size - 1;
hdr->src_pid = lib->pid;
hdr->dst_pid = process->pid;
- hdr->sent_over = port_id->id;
+ hdr->sent_over = port->id.id;
/* Mark first n chunk(s) as busy */
for (i = 0; i < n; i++) {
@@ -3268,7 +3313,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
pthread_mutex_unlock(&process->outgoing.mutex);
- rc = nxt_unit_send_mmap(ctx, port_id, fd);
+ rc = nxt_unit_send_mmap(ctx, port, fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
munmap(mem, PORT_MMAP_SIZE);
hdr = NULL;
@@ -3295,7 +3340,7 @@ remove_fail:
static int
-nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
+nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
{
ssize_t res;
nxt_port_msg_t msg;
@@ -3339,7 +3384,7 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
*/
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
- res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg),
+ res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
&cmsg, sizeof(cmsg));
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
@@ -3350,8 +3395,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
static int
-nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
+nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ uint32_t size, uint32_t min_size,
nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
int nchunks, min_nchunks;
@@ -3376,8 +3421,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
- mmap_buf->port_id = *port_id;
- mmap_buf->process = process;
+ mmap_buf->process = nxt_unit_port_process(port);
nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
mmap_buf->buf.start, (int) size);
@@ -3388,7 +3432,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
- hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
+ hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
if (nxt_slow_path(hdr == NULL)) {
if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
mmap_buf->hdr = NULL;
@@ -3407,8 +3451,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
- mmap_buf->port_id = *port_id;
- mmap_buf->process = process;
+ mmap_buf->process = nxt_unit_port_process(port);
mmap_buf->free_ptr = NULL;
mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
@@ -3770,15 +3813,12 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
{
- ssize_t res;
- nxt_port_msg_t msg;
- nxt_unit_impl_t *lib;
- nxt_unit_port_id_t port_id;
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- nxt_unit_port_id_init(&port_id, pid, 0);
-
msg.stream = 0;
msg.pid = lib->pid;
msg.reply_port = 0;
@@ -3789,7 +3829,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
msg.mf = 0;
msg.tracking = 0;
- res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3893,7 +3933,6 @@ static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
{
int rc;
- nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
nxt_unit_process_lhq_pid(&lhq, &pid);
@@ -3906,13 +3945,11 @@ nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
}
if (rc == NXT_OK) {
- process = lhq.value;
-
if (!remove) {
- nxt_unit_process_use(process);
+ nxt_unit_process_use(lhq.value);
}
- return process;
+ return lhq.value;
}
return NULL;
@@ -3990,7 +4027,7 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
}
if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
+ rc = nxt_unit_process_msg(ctx,
rbuf->buf, rbuf->size,
rbuf->oob, sizeof(rbuf->oob));
@@ -4013,25 +4050,15 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
static void
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
- nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
memset(rbuf->oob, 0, sizeof(struct cmsghdr));
- if (ctx_impl->read_port_fd != -1) {
- rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
- rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
-
- } else {
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
- rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
- }
+ rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
}
@@ -4049,52 +4076,49 @@ nxt_unit_done(nxt_unit_ctx_t *ctx)
nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
{
- int rc, fd;
+ int rc;
nxt_unit_impl_t *lib;
- nxt_unit_port_id_t new_port_id;
+ nxt_unit_port_t *port;
nxt_unit_ctx_impl_t *new_ctx;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
if (nxt_slow_path(new_ctx == NULL)) {
- nxt_unit_warn(ctx, "failed to allocate context");
+ nxt_unit_alert(ctx, "failed to allocate context");
return NULL;
}
- rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ port = nxt_unit_create_port(ctx);
+ if (nxt_slow_path(port == NULL)) {
free(new_ctx);
return NULL;
}
- rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd);
+ rc = nxt_unit_send_port(ctx, lib->router_port, port);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_remove_port(lib, &new_port_id);
-
- close(fd);
-
- free(new_ctx);
-
- return NULL;
+ goto fail;
}
- close(fd);
-
rc = nxt_unit_ctx_init(lib, new_ctx, data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_remove_port(lib, &new_port_id);
-
- free(new_ctx);
-
- return NULL;
+ goto fail;
}
- new_ctx->read_port_id = new_port_id;
+ new_ctx->read_port = port;
return &new_ctx->ctx;
+
+fail:
+
+ nxt_unit_remove_port(lib, &port->id);
+ nxt_unit_port_release(port);
+
+ free(new_ctx);
+
+ return NULL;
}
@@ -4144,6 +4168,10 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
nxt_queue_remove(&ctx_impl->link);
+ if (nxt_fast_path(ctx_impl->read_port != NULL)) {
+ nxt_unit_port_release(ctx_impl->read_port);
+ }
+
if (ctx_impl != &lib->main_ctx) {
free(ctx_impl);
}
@@ -4174,12 +4202,12 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
}
-static int
-nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
+static nxt_unit_port_t *
+nxt_unit_create_port(nxt_unit_ctx_t *ctx)
{
int rc, port_sockets[2];
nxt_unit_impl_t *lib;
- nxt_unit_port_t new_port;
+ nxt_unit_port_t new_port, *port;
nxt_unit_process_t *process;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4189,7 +4217,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
strerror(errno), errno);
- return NXT_UNIT_ERROR;
+ return NULL;
}
nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
@@ -4204,39 +4232,34 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
close(port_sockets[0]);
close(port_sockets[1]);
- return NXT_UNIT_ERROR;
+ return NULL;
}
nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
new_port.in_fd = port_sockets[0];
- new_port.out_fd = -1;
+ new_port.out_fd = port_sockets[1];
new_port.data = NULL;
pthread_mutex_unlock(&lib->mutex);
nxt_unit_process_release(process);
- rc = nxt_unit_add_port(ctx, &new_port);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_warn(ctx, "create_port: add_port() failed");
+ port = nxt_unit_add_port(ctx, &new_port);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_unit_alert(ctx, "create_port: add_port() failed");
close(port_sockets[0]);
close(port_sockets[1]);
-
- return rc;
}
- *port_id = new_port.id;
- *fd = port_sockets[1];
-
- return rc;
+ return port;
}
static int
-nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *new_port, int fd)
+nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
+ nxt_unit_port_t *port)
{
ssize_t res;
nxt_unit_impl_t *lib;
@@ -4263,8 +4286,8 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
m.msg.mf = 0;
m.msg.tracking = 0;
- m.new_port.id = new_port->id;
- m.new_port.pid = new_port->pid;
+ m.new_port.id = port->id.id;
+ m.new_port.pid = port->id.pid;
m.new_port.type = NXT_PROCESS_APP;
m.new_port.max_size = 16 * 1024;
m.new_port.max_share = 64 * 1024;
@@ -4284,7 +4307,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
- memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
+ memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int));
res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
@@ -4292,13 +4315,67 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
}
-static int
+nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
+{
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ nxt_atomic_fetch_add(&port_impl->use_count, 1);
+}
+
+
+nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
+{
+ long c;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
+
+ if (c == 1) {
+ nxt_unit_debug(NULL, "destroy port %d,%d",
+ (int) port->id.pid, (int) port->id.id);
+
+ nxt_unit_process_release(port_impl->process);
+
+ if (port->in_fd != -1) {
+ close(port->in_fd);
+
+ port->in_fd = -1;
+ }
+
+ if (port->out_fd != -1) {
+ close(port->out_fd);
+
+ port->out_fd = -1;
+ }
+
+ free(port_impl);
+ }
+}
+
+
+nxt_inline nxt_unit_process_t *
+nxt_unit_port_process(nxt_unit_port_t *port)
+{
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ return port_impl->process;
+}
+
+
+static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
nxt_unit_process_t *process;
- nxt_unit_port_impl_t *new_port, *old_port;
+ nxt_unit_port_impl_t *new_port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4311,13 +4388,13 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
- if (old_port->port.data == NULL) {
- old_port->port.data = port->data;
+ if (old_port->data == NULL) {
+ old_port->data = port->data;
port->data = NULL;
}
- if (old_port->port.in_fd == -1) {
- old_port->port.in_fd = port->in_fd;
+ if (old_port->in_fd == -1) {
+ old_port->in_fd = port->in_fd;
port->in_fd = -1;
}
@@ -4326,8 +4403,8 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
port->in_fd = -1;
}
- if (old_port->port.out_fd == -1) {
- old_port->port.out_fd = port->out_fd;
+ if (old_port->out_fd == -1) {
+ old_port->out_fd = port->out_fd;
port->out_fd = -1;
}
@@ -4336,26 +4413,27 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
port->out_fd = -1;
}
- *port = old_port->port;
+ *port = *old_port;
pthread_mutex_unlock(&lib->mutex);
if (lib->callbacks.add_port != NULL
&& (port->in_fd != -1 || port->out_fd != -1))
{
- lib->callbacks.add_port(ctx, &old_port->port);
+ lib->callbacks.add_port(ctx, old_port);
}
- return NXT_UNIT_OK;
+ return old_port;
}
+ new_port = NULL;
+
nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
process = nxt_unit_process_get(lib, port->id.pid);
if (nxt_slow_path(process == NULL)) {
- rc = NXT_UNIT_ERROR;
goto unlock;
}
@@ -4365,7 +4443,6 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port = malloc(sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(new_port == NULL)) {
- rc = NXT_UNIT_ERROR;
goto unlock;
}
@@ -4376,107 +4453,85 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
port->id.pid, port->id.id);
+ free(new_port);
+
+ new_port = NULL;
+
goto unlock;
}
nxt_queue_insert_tail(&process->ports, &new_port->link);
- rc = NXT_UNIT_OK;
-
+ new_port->use_count = 2;
new_port->process = process;
+ process = NULL;
+
unlock:
pthread_mutex_unlock(&lib->mutex);
- if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
+ if (nxt_slow_path(process != NULL)) {
nxt_unit_process_release(process);
}
if (lib->callbacks.add_port != NULL
- && rc == NXT_UNIT_OK
+ && new_port != NULL
&& (port->in_fd != -1 || port->out_fd != -1))
{
lib->callbacks.add_port(ctx, &new_port->port);
}
- return rc;
+ return &new_port->port;
}
-static int
+static void
nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
{
- int res;
- nxt_unit_port_t *port;
- nxt_unit_process_t *process;
-
- port = NULL;
- process = NULL;
+ nxt_unit_port_t *port;
+ nxt_unit_port_impl_t *port_impl;
pthread_mutex_lock(&lib->mutex);
- res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) {
- lib->callbacks.remove_port(&lib->unit, port);
- }
+ port = nxt_unit_remove_port_unsafe(lib, port_id);
if (nxt_fast_path(port != NULL)) {
- if (port->in_fd != -1) {
- close(port->in_fd);
- }
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
- if (port->out_fd != -1) {
- close(port->out_fd);
- }
+ nxt_queue_remove(&port_impl->link);
}
- if (nxt_slow_path(process != NULL)) {
- nxt_unit_process_release(process);
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (lib->callbacks.remove_port != NULL && port != NULL) {
+ lib->callbacks.remove_port(&lib->unit, port);
}
if (nxt_fast_path(port != NULL)) {
- free(port);
+ nxt_unit_port_release(port);
}
-
- return res;
}
-static int
-nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id,
- nxt_unit_port_t **r_port, nxt_unit_process_t **process)
+static nxt_unit_port_t *
+nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
{
- nxt_unit_port_impl_t *port;
+ nxt_unit_port_t *port;
port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
if (nxt_slow_path(port == NULL)) {
nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
(int) port_id->pid, (int) port_id->id);
- return NXT_UNIT_ERROR;
+ return NULL;
}
nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
(int) port_id->pid, (int) port_id->id,
- port->port.in_fd, port->port.out_fd, port->port.data);
-
- if (port->process != NULL) {
- nxt_queue_remove(&port->link);
- }
+ port->in_fd, port->out_fd, port->data);
- if (process != NULL) {
- *process = port->process;
- }
-
- if (r_port != NULL) {
- *r_port = &port->port;
- }
-
- return NXT_UNIT_OK;
+ return port;
}
@@ -4516,12 +4571,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
- nxt_unit_process_release(process);
-
- /* To avoid unlink port. */
- port->process = NULL;
-
- nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL);
+ nxt_unit_remove_port_unsafe(lib, &port->port.id);
} nxt_queue_loop;
@@ -4535,15 +4585,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
lib->callbacks.remove_port(&lib->unit, &port->port);
}
- if (port->port.in_fd != -1) {
- close(port->port.in_fd);
- }
-
- if (port->port.out_fd != -1) {
- close(port->port.out_fd);
- }
-
- free(port);
+ nxt_unit_port_release(&port->port);
} nxt_queue_loop;
@@ -4567,43 +4609,23 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
static ssize_t
-nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
- int fd;
- nxt_unit_impl_t *lib;
- nxt_unit_port_impl_t *port;
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->mutex);
-
- port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
-
- if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) {
- fd = port->port.out_fd;
-
- pthread_mutex_unlock(&lib->mutex);
-
- } else {
- pthread_mutex_unlock(&lib->mutex);
-
- nxt_unit_alert(ctx, "port_send: port %d,%d not found",
- (int) port_id->pid, (int) port_id->id);
-
- return -NXT_UNIT_ERROR;
- }
+ nxt_unit_impl_t *lib;
- nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
- (int) port_id->pid, (int) port_id->id, fd);
+ nxt_unit_debug(ctx, "port_send: port %d,%d fd %d",
+ (int) port->id.pid, (int) port->id.id, port->out_fd);
- if (lib->callbacks.port_send == NULL) {
- return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size);
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- } else {
- return lib->callbacks.port_send(ctx, port_id, buf, buf_size,
+ if (lib->callbacks.port_send != NULL) {
+ return lib->callbacks.port_send(ctx, port, buf, buf_size,
oob, oob_size);
}
+
+ return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
+ oob, oob_size);
}
@@ -4652,56 +4674,22 @@ retry:
static ssize_t
-nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
- int fd;
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_port_impl_t *port;
+ int fd;
+ ssize_t res;
+ struct iovec iov[1];
+ struct msghdr msg;
+ nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- pthread_mutex_lock(&lib->mutex);
-
- port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
-
- if (nxt_fast_path(port != NULL)) {
- fd = port->port.in_fd;
-
- } else {
- nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
- (int) port_id->pid, (int) port_id->id);
- fd = -1;
+ if (lib->callbacks.port_recv != NULL) {
+ return lib->callbacks.port_recv(ctx, port,
+ buf, buf_size, oob, oob_size);
}
- pthread_mutex_unlock(&lib->mutex);
-
- if (nxt_slow_path(fd == -1)) {
- return -1;
- }
-
- nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
- (int) port_id->pid, (int) port_id->id, fd);
-
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
-
- if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
- ctx_impl->read_port_fd = fd;
- }
-
- return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
-}
-
-
-ssize_t
-nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
- void *oob, size_t oob_size)
-{
- ssize_t res;
- struct iovec iov[1];
- struct msghdr msg;
-
iov[0].iov_base = buf;
iov[0].iov_len = buf_size;
@@ -4713,6 +4701,8 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
msg.msg_control = oob;
msg.msg_controllen = oob_size;
+ fd = port->in_fd;
+
retry:
res = recvmsg(fd, &msg, 0);
@@ -4813,7 +4803,7 @@ nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
}
-static nxt_unit_port_impl_t *
+static nxt_unit_port_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
int remove)
{
@@ -4833,6 +4823,10 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
switch (res) {
case NXT_OK:
+ if (!remove) {
+ nxt_unit_port_use(lhq.value);
+ }
+
return lhq.value;
default:
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index fa1fa843..6723026f 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -91,8 +91,7 @@ struct nxt_unit_request_info_s {
nxt_unit_t *unit;
nxt_unit_ctx_t *ctx;
- nxt_unit_port_id_t request_port;
- nxt_unit_port_id_t response_port;
+ nxt_unit_port_t *response_port;
nxt_unit_request_t *request;
nxt_unit_buf_t *request_buf;
@@ -142,12 +141,12 @@ struct nxt_unit_callbacks_s {
void (*shm_ack_handler)(nxt_unit_ctx_t *);
/* Send data and control to process pid using port id. Optional. */
- ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
/* Receive data on port id. Optional. */
- ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size);
};
@@ -195,7 +194,7 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
* from port socket should be initially processed by unit. This function
* may invoke other application-defined callback for message processing.
*/
-int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+int nxt_unit_process_msg(nxt_unit_ctx_t *,
void *buf, size_t buf_size, void *oob, size_t oob_size);
/*
@@ -225,10 +224,6 @@ nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
/* Initialize port_id, calculate hash. */
void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
-/* Default 'port_recv' implementation. */
-ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size,
- void *oob, size_t oob_size);
-
/* Calculates hash for given field name. */
uint16_t nxt_unit_field_hash(const char* name, size_t name_length);