summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorAndrei Belov <defan@nginx.com>2020-10-08 19:19:31 +0300
committerAndrei Belov <defan@nginx.com>2020-10-08 19:19:31 +0300
commitd586ac9fdc4a86c142b06a75dde4cdacad5b52f6 (patch)
tree9817282396f9d2cf5333050e4b5bf807d3617e40 /src/nxt_unit.c
parent9be35d9b7418c041e5177f273c20f0fd2d3f00ad (diff)
parentad516735a65fe109773b60e26214a071411f1734 (diff)
downloadunit-1.20.0-1.tar.gz
unit-1.20.0-1.tar.bz2
Merged with the default branch.1.20.0-1
Diffstat (limited to '')
-rw-r--r--src/nxt_unit.c243
1 files changed, 172 insertions, 71 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 6b7d631d..f75d61bc 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -74,7 +74,8 @@ static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
-static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
+static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
+ nxt_unit_websocket_frame_impl_t *ws);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
@@ -119,8 +120,7 @@ static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
-static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
- pid_t pid);
+static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
@@ -184,6 +184,9 @@ static nxt_unit_request_info_t *nxt_unit_request_hash_find(
nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
+static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
+static void nxt_unit_lvlhsh_free(void *data, void *p);
+static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
struct nxt_unit_mmap_buf_s {
@@ -531,7 +534,8 @@ nxt_unit_create(nxt_unit_init_t *init)
nxt_unit_impl_t *lib;
nxt_unit_callbacks_t *cb;
- lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
+ lib = nxt_unit_malloc(NULL,
+ sizeof(nxt_unit_impl_t) + init->request_data_size);
if (nxt_slow_path(lib == NULL)) {
nxt_unit_alert(NULL, "failed to allocate unit struct");
@@ -586,7 +590,7 @@ nxt_unit_create(nxt_unit_init_t *init)
fail:
- free(lib);
+ nxt_unit_free(NULL, lib);
return NULL;
}
@@ -710,7 +714,7 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_mmaps_destroy(&lib->incoming);
nxt_unit_mmaps_destroy(&lib->outgoing);
- free(lib);
+ nxt_unit_free(NULL, lib);
}
}
@@ -1388,7 +1392,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
return NXT_UNIT_AGAIN;
}
- port_impl = malloc(sizeof(nxt_unit_port_impl_t));
+ port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(port_impl == NULL)) {
nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
(int) sizeof(nxt_unit_port_impl_t));
@@ -1412,7 +1416,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
pthread_mutex_unlock(&lib->mutex);
- free(port);
+ nxt_unit_free(ctx, port);
return NXT_UNIT_ERROR;
}
@@ -1426,7 +1430,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
pthread_mutex_unlock(&lib->mutex);
- free(port);
+ nxt_unit_free(ctx, port);
return NXT_UNIT_ERROR;
}
@@ -1634,8 +1638,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
if (nxt_queue_is_empty(&ctx_impl->free_req)) {
pthread_mutex_unlock(&ctx_impl->mutex);
- req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
- + lib->request_data_size);
+ req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
+ + lib->request_data_size);
if (nxt_slow_path(req_impl == NULL)) {
return NULL;
}
@@ -1722,7 +1726,7 @@ nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
nxt_queue_remove(&req_impl->link);
if (req_impl != &ctx_impl->req) {
- free(req_impl);
+ nxt_unit_free(&ctx_impl->ctx, req_impl);
}
}
@@ -1741,7 +1745,7 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
pthread_mutex_unlock(&ctx_impl->mutex);
- ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
+ ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
if (nxt_slow_path(ws_impl == NULL)) {
return NULL;
}
@@ -1783,11 +1787,12 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
static void
-nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
+nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
+ nxt_unit_websocket_frame_impl_t *ws_impl)
{
nxt_queue_remove(&ws_impl->link);
- free(ws_impl);
+ nxt_unit_free(ctx, ws_impl);
}
@@ -1815,42 +1820,66 @@ nxt_unit_field_hash(const char *name, size_t name_length)
void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
{
+ char *name;
uint32_t i, j;
nxt_unit_field_t *fields, f;
nxt_unit_request_t *r;
+ static nxt_str_t content_length = nxt_string("content-length");
+ static nxt_str_t content_type = nxt_string("content-type");
+ static nxt_str_t cookie = nxt_string("cookie");
+
nxt_unit_req_debug(req, "group_dup_fields");
r = req->request;
fields = r->fields;
for (i = 0; i < r->fields_count; i++) {
+ name = nxt_unit_sptr_get(&fields[i].name);
switch (fields[i].hash) {
case NXT_UNIT_HASH_CONTENT_LENGTH:
- r->content_length_field = i;
+ if (fields[i].name_length == content_length.length
+ && nxt_unit_memcasecmp(name, content_length.start,
+ content_length.length) == 0)
+ {
+ r->content_length_field = i;
+ }
+
break;
case NXT_UNIT_HASH_CONTENT_TYPE:
- r->content_type_field = i;
+ if (fields[i].name_length == content_type.length
+ && nxt_unit_memcasecmp(name, content_type.start,
+ content_type.length) == 0)
+ {
+ r->content_type_field = i;
+ }
+
break;
case NXT_UNIT_HASH_COOKIE:
- r->cookie_field = i;
+ if (fields[i].name_length == cookie.length
+ && nxt_unit_memcasecmp(name, cookie.start,
+ cookie.length) == 0)
+ {
+ r->cookie_field = i;
+ }
+
break;
- };
+ }
for (j = i + 1; j < r->fields_count; j++) {
- if (fields[i].hash != fields[j].hash) {
- continue;
- }
-
- if (j == i + 1) {
+ if (fields[i].hash != fields[j].hash
+ || fields[i].name_length != fields[j].name_length
+ || nxt_unit_memcasecmp(name,
+ nxt_unit_sptr_get(&fields[j].name),
+ fields[j].name_length) != 0)
+ {
continue;
}
f = fields[j];
- f.name.offset += (j - (i + 1)) * sizeof(f);
f.value.offset += (j - (i + 1)) * sizeof(f);
while (j > i + 1) {
@@ -1862,6 +1891,9 @@ nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
fields[j] = f;
+ /* Assign the same name pointer for further grouping simplicity. */
+ nxt_unit_sptr_set(&fields[j].name, name);
+
i++;
}
}
@@ -2297,7 +2329,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
if (ctx_impl->free_buf == NULL) {
pthread_mutex_unlock(&ctx_impl->mutex);
- mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
+ mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
if (nxt_slow_path(mmap_buf == NULL)) {
return NULL;
}
@@ -2615,7 +2647,7 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
}
if (mmap_buf->free_ptr != NULL) {
- free(mmap_buf->free_ptr);
+ nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
mmap_buf->free_ptr = NULL;
}
@@ -2657,7 +2689,7 @@ nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
return rbuf;
}
- rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+ rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
if (nxt_fast_path(rbuf != NULL)) {
rbuf->ctx_impl = ctx_impl;
@@ -3016,7 +3048,7 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
return NULL;
}
- mmap_buf->free_ptr = malloc(size);
+ mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
nxt_unit_mmap_buf_release(mmap_buf);
@@ -3288,7 +3320,7 @@ int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
{
char *b;
- size_t size;
+ size_t size, hsize;
nxt_unit_websocket_frame_impl_t *ws_impl;
ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
@@ -3299,19 +3331,30 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
- b = malloc(size);
+ b = nxt_unit_malloc(ws->req->ctx, size);
if (nxt_slow_path(b == NULL)) {
return NXT_UNIT_ERROR;
}
memcpy(b, ws_impl->buf->buf.start, size);
+ hsize = nxt_websocket_frame_header_size(b);
+
ws_impl->buf->buf.start = b;
- ws_impl->buf->buf.free = b;
+ ws_impl->buf->buf.free = b + hsize;
ws_impl->buf->buf.end = b + size;
ws_impl->buf->free_ptr = b;
+ ws_impl->ws.header = (nxt_websocket_header_t *) b;
+
+ if (ws_impl->ws.header->mask) {
+ ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
+
+ } else {
+ ws_impl->ws.mask = NULL;
+ }
+
return NXT_UNIT_OK;
}
@@ -3796,7 +3839,8 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
mmap_buf->plain_ptr = local_buf;
} else {
- mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
+ mmap_buf->free_ptr = nxt_unit_malloc(ctx,
+ size + sizeof(nxt_port_msg_t));
if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
return NXT_UNIT_ERROR;
}
@@ -3966,7 +4010,7 @@ nxt_unit_process_release(nxt_unit_process_t *process)
if (c == 1) {
nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
- free(process);
+ nxt_unit_free(NULL, process);
}
}
@@ -3983,7 +4027,7 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
munmap(mm->hdr, PORT_MMAP_SIZE);
}
- free(mmaps->elts);
+ nxt_unit_free(NULL, mmaps->elts);
}
pthread_mutex_destroy(&mmaps->mutex);
@@ -4255,8 +4299,8 @@ nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_lvlhsh_pid_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
+ nxt_unit_lvlhsh_alloc,
+ nxt_unit_lvlhsh_free,
};
@@ -4271,11 +4315,14 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
static nxt_unit_process_t *
-nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
+nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
{
+ nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
nxt_unit_process_lhq_pid(&lhq, &pid);
if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
@@ -4285,9 +4332,9 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
return process;
}
- process = malloc(sizeof(nxt_unit_process_t));
+ process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
if (nxt_slow_path(process == NULL)) {
- nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
+ nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
return NULL;
}
@@ -4308,9 +4355,9 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
break;
default:
- nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
+ nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
- free(process);
+ nxt_unit_free(ctx, process);
process = NULL;
break;
}
@@ -4881,7 +4928,8 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
+ new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
+ + lib->request_data_size);
if (nxt_slow_path(new_ctx == NULL)) {
nxt_unit_alert(ctx, "failed to allocate context");
@@ -4890,7 +4938,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
rc = nxt_unit_ctx_init(lib, new_ctx, data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- free(new_ctx);
+ nxt_unit_free(ctx, new_ctx);
return NULL;
}
@@ -4969,7 +5017,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
while (ctx_impl->free_buf != NULL) {
mmap_buf = ctx_impl->free_buf;
nxt_unit_mmap_buf_unlink(mmap_buf);
- free(mmap_buf);
+ nxt_unit_free(&ctx_impl->ctx, mmap_buf);
}
nxt_queue_each(req_impl, &ctx_impl->free_req,
@@ -4982,7 +5030,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
nxt_queue_each(ws_impl, &ctx_impl->free_ws,
nxt_unit_websocket_frame_impl_t, link)
{
- nxt_unit_websocket_frame_free(ws_impl);
+ nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
} nxt_queue_loop;
@@ -4996,7 +5044,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
}
if (ctx_impl != &lib->main_ctx) {
- free(ctx_impl);
+ nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
}
nxt_unit_lib_release(lib);
@@ -5048,7 +5096,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&lib->mutex);
- process = nxt_unit_process_get(lib, lib->pid);
+ process = nxt_unit_process_get(ctx, lib->pid);
if (nxt_slow_path(process == NULL)) {
pthread_mutex_unlock(&lib->mutex);
@@ -5181,7 +5229,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
: sizeof(nxt_port_queue_t));
}
- free(port_impl);
+ nxt_unit_free(NULL, port_impl);
}
}
@@ -5288,7 +5336,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
port->id.pid, port->id.id,
port->in_fd, port->out_fd, queue);
- process = nxt_unit_process_get(lib, port->id.pid);
+ process = nxt_unit_process_get(ctx, port->id.pid);
if (nxt_slow_path(process == NULL)) {
goto unlock;
}
@@ -5297,7 +5345,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
process->next_port_id = port->id.id + 1;
}
- new_port = malloc(sizeof(nxt_unit_port_impl_t));
+ new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(new_port == NULL)) {
nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
port->id.pid, port->id.id);
@@ -5312,7 +5360,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
port->id.pid, port->id.id);
- free(new_port);
+ nxt_unit_free(ctx, new_port);
new_port = NULL;
@@ -5716,10 +5764,6 @@ retry:
nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);
- if (port_impl->from_socket) {
- nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
- }
-
goto retry;
}
@@ -5977,8 +6021,8 @@ nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_port_hash_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
+ nxt_unit_lvlhsh_alloc,
+ nxt_unit_lvlhsh_free,
};
@@ -6076,8 +6120,8 @@ nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_request_hash_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
+ nxt_unit_lvlhsh_alloc,
+ nxt_unit_lvlhsh_free,
};
@@ -6158,7 +6202,9 @@ nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
case NXT_OK:
req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
req);
- req_impl->in_hash = 0;
+ if (remove) {
+ req_impl->in_hash = 0;
+ }
return lhq.value;
@@ -6307,29 +6353,84 @@ nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
}
-/* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
-
-void *
-nxt_memalign(size_t alignment, size_t size)
+static void *
+nxt_unit_lvlhsh_alloc(void *data, size_t size)
{
- void *p;
- nxt_err_t err;
+ int err;
+ void *p;
- err = posix_memalign(&p, alignment, size);
+ err = posix_memalign(&p, size, size);
if (nxt_fast_path(err == 0)) {
+ nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
+ (int) size, (int) size, p);
return p;
}
+ nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
+ (int) size, (int) size, strerror(err), err);
return NULL;
}
-#if (NXT_DEBUG)
+
+static void
+nxt_unit_lvlhsh_free(void *data, void *p)
+{
+ nxt_unit_free(NULL, p);
+}
+
+
+void *
+nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
+{
+ void *p;
+
+ p = malloc(size);
+
+ if (nxt_fast_path(p != NULL)) {
+ nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
+
+ } else {
+ nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
+ (int) size, strerror(errno), errno);
+ }
+
+ return p;
+}
+
void
-nxt_free(void *p)
+nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
{
+ nxt_unit_debug(ctx, "free(%p)", p);
+
free(p);
}
-#endif
+
+static int
+nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
+{
+ u_char c1, c2;
+ nxt_int_t n;
+ const u_char *s1, *s2;
+
+ s1 = p1;
+ s2 = p2;
+
+ while (length-- != 0) {
+ c1 = *s1++;
+ c2 = *s2++;
+
+ c1 = nxt_lowcase(c1);
+ c2 = nxt_lowcase(c2);
+
+ n = c1 - c2;
+
+ if (n != 0) {
+ return n;
+ }
+ }
+
+ return 0;
+}