summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c1171
1 files changed, 921 insertions, 250 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 66aadd98..1008a9d6 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -7,6 +7,8 @@
#include "nxt_main.h"
#include "nxt_port_memory_int.h"
+#include "nxt_port_queue.h"
+#include "nxt_app_queue.h"
#include "nxt_unit.h"
#include "nxt_unit_request.h"
@@ -50,12 +52,15 @@ nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
-static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
+static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
+ int queue_fd);
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_port_id_t *port_id);
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
@@ -92,6 +97,7 @@ static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, int n);
+static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
@@ -103,8 +109,6 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
-static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
@@ -124,18 +128,22 @@ static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
+nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
- nxt_unit_port_t *port);
+ nxt_unit_port_t *port, int queue_fd);
nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
- nxt_unit_port_t *port);
+ nxt_unit_port_t *port, void *queue);
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
nxt_unit_port_id_t *port_id);
static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
@@ -150,18 +158,28 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
const void *oob, size_t oob_size);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
+static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
+ nxt_unit_read_buf_t *src);
+static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
nxt_unit_port_id_t *port_id, int remove);
-static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
- nxt_unit_request_info_impl_t *req_impl);
-static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
- nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
+static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
+ nxt_unit_request_info_t *req);
+static nxt_unit_request_info_t *nxt_unit_request_hash_find(
+ nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
@@ -217,6 +235,7 @@ struct nxt_unit_request_info_impl_s {
nxt_unit_req_state_t state;
uint8_t websocket;
+ uint8_t in_hash;
/* for nxt_unit_ctx_impl_t.free_req or active_req */
nxt_queue_link_t link;
@@ -349,6 +368,11 @@ struct nxt_unit_port_impl_s {
nxt_queue_t awaiting_req;
int ready;
+
+ void *queue;
+
+ int from_socket;
+ nxt_unit_read_buf_t *socket_rbuf;
};
@@ -375,7 +399,8 @@ typedef struct {
nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
- int rc;
+ int rc, queue_fd;
+ void *mem;
uint32_t ready_stream, shm_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
@@ -386,6 +411,8 @@ nxt_unit_init(nxt_unit_init_t *init)
return NULL;
}
+ queue_fd = -1;
+
if (init->ready_port.id.pid != 0
&& init->ready_stream != 0
&& init->read_port.id.pid != 0)
@@ -422,33 +449,58 @@ nxt_unit_init(nxt_unit_init_t *init)
ctx = &lib->main_ctx.ctx;
- lib->router_port = nxt_unit_add_port(ctx, &router_port);
+ lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
if (nxt_slow_path(lib->router_port == NULL)) {
nxt_unit_alert(NULL, "failed to add router_port");
goto fail;
}
- lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port);
+ queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(queue_fd == -1)) {
+ goto fail;
+ }
+
+ mem = mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ nxt_port_queue_init(mem);
+
+ lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
nxt_unit_alert(NULL, "failed to add read_port");
+ munmap(mem, sizeof(nxt_port_queue_t));
+
goto fail;
}
- rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
+ rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
+ munmap(mem, sizeof(nxt_port_queue_t));
+
goto fail;
}
close(ready_port.out_fd);
+ close(queue_fd);
return ctx;
fail:
+ if (queue_fd != -1) {
+ close(queue_fd);
+ }
+
nxt_unit_ctx_release(&lib->main_ctx.ctx);
return NULL;
@@ -497,6 +549,7 @@ nxt_unit_create(nxt_unit_init_t *init)
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ pthread_mutex_destroy(&lib->mutex);
goto fail;
}
@@ -505,6 +558,7 @@ nxt_unit_create(nxt_unit_init_t *init)
if (cb->request_handler == NULL) {
nxt_unit_alert(NULL, "request_handler is NULL");
+ pthread_mutex_destroy(&lib->mutex);
goto fail;
}
@@ -765,12 +819,17 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
static int
-nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
+nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
+ union {
+ struct cmsghdr cm;
+ char space[CMSG_SPACE(sizeof(int))];
+ } cmsg;
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
msg.stream = stream;
@@ -783,7 +842,25 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
msg.mf = 0;
msg.tracking = 0;
- res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0);
+ memset(&cmsg, 0, sizeof(cmsg));
+
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_level = SOL_SOCKET;
+ cmsg.cm.cmsg_type = SCM_RIGHTS;
+
+ /*
+ * memcpy() is used instead of simple
+ * *(int *) CMSG_DATA(&cmsg.cm) = fd;
+ * because GCC 4.4 with -O2/3/s optimization may issue a warning:
+ * dereferencing type-punned pointer will break strict-aliasing rules
+ *
+ * Fortunately, GCC with -O1 compiles this nxt_memcpy()
+ * in the same simple assignment as in the code above.
+ */
+ memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
+
+ res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
+ &cmsg, sizeof(cmsg));
if (res != sizeof(msg)) {
return NXT_UNIT_ERROR;
}
@@ -838,6 +915,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
goto fail;
}
+ nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d",
+ port_msg->stream, (int) port_msg->type,
+ recv_msg.fd, recv_msg.fd2);
+
recv_msg.stream = port_msg->stream;
recv_msg.pid = port_msg->pid;
recv_msg.reply_port = port_msg->reply_port;
@@ -853,19 +934,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
goto fail;
}
- if (port_msg->tracking) {
- rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
-
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- if (rc == NXT_UNIT_AGAIN) {
- recv_msg.fd = -1;
- recv_msg.fd2 = -1;
- }
-
- goto fail;
- }
- }
-
/* Fragmentation is unsupported. */
if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
@@ -929,6 +997,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
rc = nxt_unit_process_req_headers(ctx, &recv_msg);
break;
+ case _NXT_PORT_MSG_REQ_BODY:
+ rc = nxt_unit_process_req_body(ctx, &recv_msg);
+ break;
+
case _NXT_PORT_MSG_WEBSOCKET:
rc = nxt_unit_process_websocket(ctx, &recv_msg);
break;
@@ -992,6 +1064,7 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
+ void *mem;
nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -1013,9 +1086,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port_msg = recv_msg->start;
- nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
+ nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d",
recv_msg->stream, (int) new_port_msg->pid,
- (int) new_port_msg->id, recv_msg->fd);
+ (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -1025,6 +1098,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port.in_fd = recv_msg->fd;
new_port.out_fd = -1;
+ mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd2, 0);
+
} else {
nb = 0;
@@ -1041,14 +1117,23 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port.in_fd = -1;
new_port.out_fd = recv_msg->fd;
+
+ mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd2, 0);
}
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2,
+ strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
new_port.data = NULL;
recv_msg->fd = -1;
- port = nxt_unit_add_port(ctx, &new_port);
+ port = nxt_unit_add_port(ctx, &new_port, mem);
if (nxt_slow_path(port == NULL)) {
return NXT_UNIT_ERROR;
}
@@ -1134,6 +1219,7 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req->response_max_fields = 0;
req_impl->state = NXT_UNIT_RS_START;
req_impl->websocket = 0;
+ req_impl->in_hash = 0;
nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
(int) r->method_length,
@@ -1151,12 +1237,82 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
if (nxt_fast_path(res == NXT_UNIT_OK)) {
res = nxt_unit_send_req_headers_ack(req);
- if (nxt_slow_path(res != NXT_UNIT_OK)) {
- return res;
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ if (req->content_length
+ > (uint64_t) (req->content_buf->end - req->content_buf->free))
+ {
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ /*
+ * If application have separate data handler, we may start
+ * request processing and process data when it is arrived.
+ */
+ if (lib->callbacks.data_handler == NULL) {
+ return NXT_UNIT_OK;
+ }
+ }
+
+ lib->callbacks.request_handler(req);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ uint64_t l;
+ nxt_unit_impl_t *lib;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_request_info_t *req;
+
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ if (req == NULL) {
+ return NXT_UNIT_OK;
+ }
+
+ l = req->content_buf->end - req->content_buf->free;
+
+ for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
+ b->req = req;
+ l += b->buf.end - b->buf.free;
+ }
+
+ if (recv_msg->incoming_buf != NULL) {
+ b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
+
+ /* "Move" incoming buffer list to req_impl. */
+ nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
+ recv_msg->incoming_buf = NULL;
+ }
+
+ req->content_fd = recv_msg->fd;
+ recv_msg->fd = -1;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (lib->callbacks.data_handler != NULL) {
+ lib->callbacks.data_handler(req);
+
+ return NXT_UNIT_OK;
+ }
+
+ if (req->content_fd != -1 || l == req->content_length) {
lib->callbacks.request_handler(req);
}
@@ -1260,6 +1416,9 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_queue_insert_tail(&process->ports, &port_impl->link);
port_impl->process = process;
+ port_impl->queue = NULL;
+ port_impl->from_socket = 0;
+ port_impl->socket_rbuf = NULL;
nxt_queue_init(&port_impl->awaiting_req);
@@ -1321,21 +1480,17 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
size_t hsize;
nxt_unit_impl_t *lib;
nxt_unit_mmap_buf_t *b;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_callbacks_t *cb;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
-
- req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
- recv_msg->last);
- if (req_impl == NULL) {
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ if (nxt_slow_path(req == NULL)) {
return NXT_UNIT_OK;
}
- req = &req_impl->req;
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
cb = &lib->callbacks;
@@ -1501,12 +1656,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->response = NULL;
req->response_buf = NULL;
- if (req_impl->websocket) {
- nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
-
- req_impl->websocket = 0;
+ if (req_impl->in_hash) {
+ nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
}
+ req_impl->websocket = 0;
+
while (req_impl->outgoing_buf != NULL) {
nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
}
@@ -2170,7 +2325,6 @@ int
nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
{
int rc;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
@@ -2193,9 +2347,7 @@ nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
return NXT_UNIT_ERROR;
}
- ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
-
- rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
+ rc = nxt_unit_request_hash_add(req->ctx, req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
@@ -2466,6 +2618,8 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
pthread_mutex_unlock(&ctx_impl->mutex);
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
return rbuf;
}
@@ -2564,6 +2718,8 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
nxt_unit_request_info_impl_t *req_impl;
char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
+ nxt_unit_req_debug(req, "write: %d", (int) size);
+
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
@@ -2743,9 +2899,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
dst, size);
+ nxt_unit_req_debug(req, "read: %d", (int) buf_res);
+
if (buf_res < (ssize_t) size && req->content_fd != -1) {
res = read(req->content_fd, dst, size);
- if (res < 0) {
+ if (nxt_slow_path(res < 0)) {
nxt_unit_req_alert(req, "failed to read content: %s (%d)",
strerror(errno), errno);
@@ -3301,7 +3459,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
{
- nxt_port_msg_t *port_msg;
+ int res;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
@@ -3313,21 +3471,15 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
-
- nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
-
- if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_ERROR) {
nxt_unit_read_buf_release(ctx, rbuf);
return NXT_UNIT_ERROR;
}
- port_msg = (nxt_port_msg_t *) rbuf->buf;
-
- if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
+ if (nxt_unit_is_shm_ack(rbuf)) {
nxt_unit_read_buf_release(ctx, rbuf);
-
break;
}
@@ -3337,7 +3489,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
pthread_mutex_unlock(&ctx_impl->mutex);
- if (port_msg->type == _NXT_PORT_MSG_QUIT) {
+ if (nxt_unit_is_quit(rbuf)) {
nxt_unit_debug(ctx, "oosm: quit received");
return NXT_UNIT_ERROR;
@@ -3406,7 +3558,6 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
{
int i, fd, rc;
void *mem;
- char name[64];
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
@@ -3420,59 +3571,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
return NULL;
}
- snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
- lib->pid, (void *) pthread_self());
-
-#if (NXT_HAVE_MEMFD_CREATE)
-
- fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
if (nxt_slow_path(fd == -1)) {
- nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
- strerror(errno), errno);
-
- goto remove_fail;
- }
-
- nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
-
-#elif (NXT_HAVE_SHM_OPEN_ANON)
-
- fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
- if (nxt_slow_path(fd == -1)) {
- nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
- strerror(errno), errno);
-
- goto remove_fail;
- }
-
-#elif (NXT_HAVE_SHM_OPEN)
-
- /* Just in case. */
- shm_unlink(name);
-
- fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
- if (nxt_slow_path(fd == -1)) {
- nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
- strerror(errno), errno);
-
- goto remove_fail;
- }
-
- if (nxt_slow_path(shm_unlink(name) == -1)) {
- nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
- strerror(errno), errno);
- }
-
-#else
-
-#error No working shared memory implementation.
-
-#endif
-
- if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
- nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
- strerror(errno), errno);
-
goto remove_fail;
}
@@ -3481,6 +3581,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
strerror(errno), errno);
+ close(fd);
+
goto remove_fail;
}
@@ -3533,6 +3635,80 @@ remove_fail:
static int
+nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
+{
+ int fd;
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
+ char name[64];
+
+ snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
+ lib->pid, (void *) pthread_self());
+#endif
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
+
+#elif (NXT_HAVE_SHM_OPEN_ANON)
+
+ fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
+ strerror(errno), errno);
+
+ return -1;
+ }
+
+#elif (NXT_HAVE_SHM_OPEN)
+
+ /* Just in case. */
+ shm_unlink(name);
+
+ fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+
+ return -1;
+ }
+
+ if (nxt_slow_path(shm_unlink(name) == -1)) {
+ nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+ }
+
+#else
+
+#error No working shared memory implementation.
+
+#endif
+
+ if (nxt_slow_path(ftruncate(fd, size) == -1)) {
+ nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
+
+ close(fd);
+
+ return -1;
+ }
+
+ return fd;
+}
+
+
+static int
nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
{
ssize_t res;
@@ -3797,61 +3973,6 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
static int
-nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
- nxt_unit_read_buf_t *rbuf)
-{
- int res;
- nxt_chunk_id_t c;
- nxt_unit_impl_t *lib;
- nxt_port_mmap_header_t *hdr;
- nxt_port_mmap_tracking_msg_t *tracking_msg;
-
- if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
- nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
- recv_msg->stream, (int) recv_msg->size);
-
- return NXT_UNIT_ERROR;
- }
-
- tracking_msg = recv_msg->start;
-
- recv_msg->start = tracking_msg + 1;
- recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->incoming.mutex);
-
- res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming,
- recv_msg->pid, tracking_msg->mmap_id,
- &hdr, rbuf);
-
- if (nxt_slow_path(res != NXT_UNIT_OK)) {
- return res;
- }
-
- c = tracking_msg->tracking_id;
- res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
-
- if (res == 0) {
- nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
- recv_msg->stream);
-
- nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
-
- res = NXT_UNIT_CANCELLED;
-
- } else {
- res = NXT_UNIT_OK;
- }
-
- pthread_mutex_unlock(&lib->incoming.mutex);
-
- return res;
-}
-
-
-static int
nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
nxt_unit_read_buf_t *rbuf)
@@ -4154,7 +4275,7 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
}
process->pid = pid;
- process->use_count = 1;
+ process->use_count = 2;
process->next_port_id = 0;
process->lib = lib;
@@ -4176,8 +4297,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
break;
}
- nxt_unit_process_use(process);
-
return process;
}
@@ -4293,22 +4412,52 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
static int
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
- int res, err;
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
- struct pollfd fds[2];
+ int nevents, res, err;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port_impl;
+ struct pollfd fds[2];
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
-
if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
- return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
+
+ return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
}
+ port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
+ port);
+
retry:
+ if (port_impl->from_socket == 0) {
+ res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_OK) {
+ if (nxt_unit_is_read_socket(rbuf)) {
+ port_impl->from_socket++;
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
+ (int) ctx_impl->read_port->id.pid,
+ (int) ctx_impl->read_port->id.id,
+ port_impl->from_socket);
+
+ } else {
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
+ (int) ctx_impl->read_port->id.pid,
+ (int) ctx_impl->read_port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
+ }
+ }
+
+ res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
+ if (res == NXT_UNIT_OK) {
+ return NXT_UNIT_OK;
+ }
+
fds[0].fd = ctx_impl->read_port->in_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
@@ -4317,31 +4466,47 @@ retry:
fds[1].events = POLLIN;
fds[1].revents = 0;
- res = poll(fds, 2, -1);
- if (nxt_slow_path(res < 0)) {
+ nevents = poll(fds, 2, -1);
+ if (nxt_slow_path(nevents == -1)) {
err = errno;
if (err == EINTR) {
goto retry;
}
- nxt_unit_alert(ctx, "poll() failed: %s (%d)",
- strerror(err), err);
+ nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
+ fds[0].fd, fds[1].fd, strerror(err), err);
rbuf->size = -1;
return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
+ nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
+ fds[0].fd, fds[1].fd, nevents, fds[0].revents,
+ fds[1].revents);
+
if ((fds[0].revents & POLLIN) != 0) {
- return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ return res;
}
if ((fds[1].revents & POLLIN) != 0) {
- return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
+ res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
+ if (res == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ return res;
}
- rbuf->size = -1;
+ nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
+ fds[0].fd, fds[1].fd, nevents, fds[0].revents,
+ fds[1].revents);
return NXT_UNIT_ERROR;
}
@@ -4392,9 +4557,11 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
static void
nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
+ int res;
nxt_queue_t ready_req;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
nxt_queue_init(&ready_req);
@@ -4419,7 +4586,35 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
- (void) nxt_unit_send_req_headers_ack(&req_impl->req);
+ req = &req_impl->req;
+
+ res = nxt_unit_send_req_headers_ack(req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ continue;
+ }
+
+ if (req->content_length
+ > (uint64_t) (req->content_buf->end - req->content_buf->free))
+ {
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ continue;
+ }
+
+ /*
+ * If application have separate data handler, we may start
+ * request processing and process data when it is arrived.
+ */
+ if (lib->callbacks.data_handler == NULL) {
+ continue;
+ }
+ }
lib->callbacks.request_handler(&req_impl->req);
@@ -4432,6 +4627,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
@@ -4442,11 +4638,30 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ break;
+ }
+ retry:
+
+ rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (rc == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
+
+ nxt_unit_process_ready_req(ctx);
}
nxt_unit_ctx_release(ctx);
@@ -4455,11 +4670,68 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
}
+nxt_inline int
+nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
+{
+ if (nxt_fast_path(rbuf->size == 1)) {
+ return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_QUIT;
+ }
+
+ return 0;
+}
+
+
int
nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_use(ctx);
@@ -4467,11 +4739,35 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ break;
+ }
+
+ retry:
+ rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
+ if (rc == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ break;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
+
+ nxt_unit_process_ready_req(ctx);
}
nxt_unit_ctx_release(ctx);
@@ -4499,6 +4795,7 @@ static int
nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
+ nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
rbuf = nxt_unit_read_buf_get(ctx);
@@ -4506,10 +4803,18 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
return NXT_UNIT_ERROR;
}
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- rc = nxt_unit_port_recv(ctx, port, rbuf);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+retry:
+
+ if (port == lib->shared_port) {
+ rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
+
+ } else {
+ rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
+ }
+
+ if (rc != NXT_UNIT_OK) {
nxt_unit_read_buf_release(ctx, rbuf);
return rc;
}
@@ -4526,6 +4831,15 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_process_ready_req(ctx);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (lib->online) {
+ goto retry;
+ }
+
return rc;
}
@@ -4540,10 +4854,12 @@ nxt_unit_done(nxt_unit_ctx_t *ctx)
nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *port;
- nxt_unit_ctx_impl_t *new_ctx;
+ int rc, queue_fd;
+ void *mem;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_ctx_impl_t *new_ctx;
+ nxt_unit_port_impl_t *port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4554,33 +4870,57 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
return NULL;
}
+ rc = nxt_unit_ctx_init(lib, new_ctx, data);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ free(new_ctx);
+
+ return NULL;
+ }
+
+ queue_fd = -1;
+
port = nxt_unit_create_port(ctx);
if (nxt_slow_path(port == NULL)) {
- free(new_ctx);
+ goto fail;
+ }
- return NULL;
+ new_ctx->read_port = port;
+
+ queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(queue_fd == -1)) {
+ goto fail;
}
- rc = nxt_unit_send_port(ctx, lib->router_port, port);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ mem = mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
+ strerror(errno), errno);
+
goto fail;
}
- rc = nxt_unit_ctx_init(lib, new_ctx, data);
+ nxt_port_queue_init(mem);
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ port_impl->queue = mem;
+
+ rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
- new_ctx->read_port = port;
+ close(queue_fd);
return &new_ctx->ctx;
fail:
- nxt_unit_remove_port(lib, &port->id);
- nxt_unit_port_release(port);
+ if (queue_fd != -1) {
+ close(queue_fd);
+ }
- free(new_ctx);
+ nxt_unit_ctx_release(&new_ctx->ctx);
return NULL;
}
@@ -4633,6 +4973,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
nxt_queue_remove(&ctx_impl->link);
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
+ nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
nxt_unit_port_release(ctx_impl->read_port);
}
@@ -4709,10 +5050,8 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
nxt_unit_process_release(process);
- port = nxt_unit_add_port(ctx, &new_port);
+ port = nxt_unit_add_port(ctx, &new_port, NULL);
if (nxt_slow_path(port == NULL)) {
- nxt_unit_alert(ctx, "create_port: add_port() failed");
-
close(port_sockets[0]);
close(port_sockets[1]);
}
@@ -4723,10 +5062,11 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
static int
nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
- nxt_unit_port_t *port)
+ nxt_unit_port_t *port, int queue_fd)
{
ssize_t res;
nxt_unit_impl_t *lib;
+ int fds[2] = { port->out_fd, queue_fd };
struct {
nxt_port_msg_t msg;
@@ -4735,7 +5075,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
union {
struct cmsghdr cm;
- char space[CMSG_SPACE(sizeof(int))];
+ char space[CMSG_SPACE(sizeof(int) * 2)];
} cmsg;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4758,7 +5098,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
memset(&cmsg, 0, sizeof(cmsg));
- cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;
@@ -4771,7 +5111,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
- memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int));
+ memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
@@ -4799,7 +5139,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
if (c == 1) {
- nxt_unit_debug(NULL, "destroy port %d,%d",
+ nxt_unit_debug(NULL, "destroy port{%d,%d}",
(int) port->id.pid, (int) port->id.id);
nxt_unit_process_release(port_impl->process);
@@ -4816,13 +5156,31 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
port->out_fd = -1;
}
+ if (port->in_fd != -1) {
+ close(port->in_fd);
+
+ port->in_fd = -1;
+ }
+
+ if (port->out_fd != -1) {
+ close(port->out_fd);
+
+ port->out_fd = -1;
+ }
+
+ if (port_impl->queue != NULL) {
+ munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
+ ? sizeof(nxt_app_queue_t)
+ : sizeof(nxt_port_queue_t));
+ }
+
free(port_impl);
}
}
static nxt_unit_port_t *
-nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
{
int rc;
nxt_queue_t awaiting_req;
@@ -4840,9 +5198,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
if (nxt_slow_path(old_port != NULL)) {
- nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
- port->id.pid, port->id.id,
- port->in_fd, port->out_fd);
+ nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
+ "in_fd %d out_fd %d queue %p",
+ port->id.pid, port->id.id,
+ port->in_fd, port->out_fd, queue);
if (old_port->data == NULL) {
old_port->data = port->data;
@@ -4875,6 +5234,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
+ if (old_port_impl->queue == NULL) {
+ old_port_impl->queue = queue;
+ }
+
if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
nxt_queue_init(&old_port_impl->awaiting_req);
@@ -4914,9 +5277,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port = NULL;
- nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
+ nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
port->id.pid, port->id.id,
- port->in_fd, port->out_fd);
+ port->in_fd, port->out_fd, queue);
process = nxt_unit_process_get(lib, port->id.pid);
if (nxt_slow_path(process == NULL)) {
@@ -4929,6 +5292,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port = malloc(sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(new_port == NULL)) {
+ nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
+ port->id.pid, port->id.id);
+
goto unlock;
}
@@ -4951,6 +5317,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port->use_count = 2;
new_port->process = process;
new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
+ new_port->queue = queue;
+ new_port->from_socket = 0;
+ new_port->socket_rbuf = NULL;
nxt_queue_init(&new_port->awaiting_req);
@@ -5010,13 +5379,13 @@ nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
if (nxt_slow_path(port == NULL)) {
- nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
+ nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
(int) port_id->pid, (int) port_id->id);
return NULL;
}
- nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
+ nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
(int) port_id->pid, (int) port_id->id,
port->in_fd, port->out_fd, port->data);
@@ -5089,10 +5458,12 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- lib->online = 0;
+ if (lib->online) {
+ lib->online = 0;
- if (lib->callbacks.quit != NULL) {
- lib->callbacks.quit(ctx);
+ if (lib->callbacks.quit != NULL) {
+ lib->callbacks.quit(ctx);
+ }
}
}
@@ -5137,20 +5508,91 @@ static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
- nxt_unit_impl_t *lib;
-
- nxt_unit_debug(ctx, "port_send: port %d,%d fd %d",
- (int) port->id.pid, (int) port->id.id, port->out_fd);
+ int notify;
+ ssize_t ret;
+ nxt_int_t rc;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_impl_t *port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ if (port_impl->queue != NULL && oob_size == 0
+ && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
+ {
+ rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
+ (int) port->id.pid, (int) port->id.id);
+
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) buf_size, notify);
+
+ if (notify) {
+ memcpy(&msg, buf, sizeof(nxt_port_msg_t));
+
+ msg.type = _NXT_PORT_MSG_READ_QUEUE;
+
+ if (lib->callbacks.port_send == NULL) {
+ ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
+ sizeof(nxt_port_msg_t), NULL, 0);
+
+ nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+
+ } else {
+ ret = lib->callbacks.port_send(ctx, port, &msg,
+ sizeof(nxt_port_msg_t), NULL, 0);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+ }
+
+ }
+
+ return buf_size;
+ }
+
+ if (port_impl->queue != NULL) {
+ msg.type = _NXT_PORT_MSG_READ_SOCKET;
+
+ rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
+ (int) port->id.pid, (int) port->id.id);
+
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
+ (int) port->id.pid, (int) port->id.id, notify);
+ }
+
if (lib->callbacks.port_send != NULL) {
- return lib->callbacks.port_send(ctx, port, buf, buf_size,
- oob, oob_size);
+ ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
+ oob, oob_size);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+
+ } else {
+ ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
+ oob, oob_size);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
}
- return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
- oob, oob_size);
+ return ret;
}
@@ -5158,6 +5600,7 @@ static ssize_t
nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
+ int err;
ssize_t res;
struct iovec iov[1];
struct msghdr msg;
@@ -5178,7 +5621,9 @@ retry:
res = sendmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) {
- if (errno == EINTR) {
+ err = errno;
+
+ if (err == EINTR) {
goto retry;
}
@@ -5187,7 +5632,7 @@ retry:
* implementation.
*/
nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
- fd, (int) buf_size, strerror(errno), errno);
+ fd, (int) buf_size, strerror(err), err);
} else {
nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
@@ -5199,6 +5644,158 @@ retry:
static int
+nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res, read;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ read = 0;
+
+retry:
+
+ if (port_impl->from_socket > 0) {
+ if (port_impl->socket_rbuf != NULL
+ && port_impl->socket_rbuf->size > 0)
+ {
+ port_impl->from_socket--;
+
+ nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
+ port_impl->socket_rbuf->size = 0;
+
+ nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
+
+ } else {
+ res = nxt_unit_port_queue_recv(port, rbuf);
+
+ if (res == NXT_UNIT_OK) {
+ if (nxt_unit_is_read_socket(rbuf)) {
+ port_impl->from_socket++;
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
+ (int) port->id.pid, (int) port->id.id,
+ port_impl->from_socket);
+
+ goto retry;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
+ }
+
+ if (read) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ res = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ read = 1;
+
+ if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
+ if (port_impl->from_socket) {
+ nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
+ }
+
+ goto retry;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ if (res == NXT_UNIT_AGAIN) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ if (port_impl->from_socket > 0) {
+ port_impl->from_socket--;
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ if (port_impl->socket_rbuf == NULL) {
+ port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
+
+ if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ port_impl->socket_rbuf->size = 0;
+ }
+
+ if (port_impl->socket_rbuf->size > 0) {
+ nxt_unit_alert(ctx, "too many port socket messages");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ goto retry;
+}
+
+
+nxt_inline void
+nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
+{
+ memcpy(dst->buf, src->buf, src->size);
+ dst->size = src->size;
+ memcpy(dst->oob, src->oob, sizeof(src->oob));
+}
+
+
+static int
+nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res;
+
+retry:
+
+ res = nxt_unit_app_queue_recv(port, rbuf);
+
+ if (res == NXT_UNIT_AGAIN) {
+ res = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
+ goto retry;
+ }
+ }
+
+ return res;
+}
+
+
+static int
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf)
{
@@ -5214,6 +5811,9 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
+ nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
if (nxt_slow_path(rbuf->size < 0)) {
return NXT_UNIT_ERROR;
}
@@ -5247,13 +5847,13 @@ retry:
if (err == EAGAIN) {
nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
- fd, strerror(errno), errno);
+ fd, strerror(err), err);
return NXT_UNIT_AGAIN;
}
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
- fd, strerror(errno), errno);
+ fd, strerror(err), err);
return NXT_UNIT_ERROR;
}
@@ -5264,6 +5864,52 @@ retry:
}
+static int
+nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
+{
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
+
+ return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
+{
+ uint32_t cookie;
+ nxt_port_msg_t *port_msg;
+ nxt_app_queue_t *queue;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ queue = port_impl->queue;
+
+retry:
+
+ rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
+
+ nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
+
+ if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
+
+ goto retry;
+ }
+
+ return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
+}
+
+
static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
@@ -5392,12 +6038,19 @@ static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
static int
-nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
- nxt_unit_request_info_impl_t *req_impl)
+nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
+ nxt_unit_request_info_t *req)
{
- uint32_t *stream;
- nxt_int_t res;
- nxt_lvlhsh_query_t lhq;
+ uint32_t *stream;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ if (req_impl->in_hash) {
+ return NXT_UNIT_OK;
+ }
stream = &req_impl->stream;
@@ -5409,11 +6062,18 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
lhq.replace = 0;
lhq.value = req_impl;
- res = nxt_lvlhsh_insert(request_hash, &lhq);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
switch (res) {
case NXT_OK:
+ req_impl->in_hash = 1;
return NXT_UNIT_OK;
default:
@@ -5422,12 +6082,13 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
}
-static nxt_unit_request_info_impl_t *
-nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
- int remove)
+static nxt_unit_request_info_t *
+nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
{
- nxt_int_t res;
- nxt_lvlhsh_query_t lhq;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
lhq.key.length = sizeof(stream);
@@ -5435,16 +6096,26 @@ nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
lhq.proto = &lvlhsh_requests_proto;
lhq.pool = NULL;
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
if (remove) {
- res = nxt_lvlhsh_delete(request_hash, &lhq);
+ res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
} else {
- res = nxt_lvlhsh_find(request_hash, &lhq);
+ res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
}
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
switch (res) {
case NXT_OK:
+ req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
+ req);
+ req_impl->in_hash = 0;
+
return lhq.value;
default: