summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c96
1 files changed, 80 insertions, 16 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 28a0de20..9ccd1fd9 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -31,7 +31,7 @@ typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
-static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
+static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf);
@@ -204,6 +204,8 @@ struct nxt_unit_websocket_frame_impl_s {
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
+ pthread_mutex_t mutex;
+
nxt_unit_port_id_t read_port_id;
int read_port_fd;
@@ -331,6 +333,7 @@ nxt_unit_init(nxt_unit_init_t *init)
}
}
+ lib->pid = read_port.id.pid;
ctx = &lib->main_ctx.ctx;
rc = lib->callbacks.add_port(ctx, &ready_port);
@@ -396,13 +399,15 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->processes.slot = NULL;
lib->ports.slot = NULL;
- lib->pid = getpid();
lib->log_fd = STDERR_FILENO;
lib->online = 1;
nxt_queue_init(&lib->contexts);
- nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
+ rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
cb = &lib->callbacks;
@@ -446,15 +451,24 @@ fail:
}
-static void
+static int
nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
void *data)
{
+ int rc;
+
ctx_impl->ctx.data = data;
ctx_impl->ctx.unit = &lib->unit;
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
+ rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
+ if (nxt_slow_path(rc != 0)) {
+ nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
+
+ return NXT_UNIT_ERROR;
+ }
+
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
@@ -470,6 +484,8 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
ctx_impl->read_port_fd = -1;
ctx_impl->requests.slot = 0;
+
+ return NXT_UNIT_OK;
}
@@ -962,6 +978,11 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
} else {
b = nxt_unit_mmap_buf_get(ctx);
if (nxt_slow_path(b == NULL)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
+ req_impl->stream);
+
+ nxt_unit_websocket_frame_release(&ws_impl->ws);
+
return NXT_UNIT_ERROR;
}
@@ -1029,18 +1050,22 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ pthread_mutex_lock(&ctx_impl->mutex);
+
if (nxt_queue_is_empty(&ctx_impl->free_req)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
+ lib->request_data_size);
if (nxt_slow_path(req_impl == NULL)) {
- nxt_unit_warn(ctx, "request info allocation failed");
-
return NULL;
}
req_impl->req.unit = ctx->unit;
req_impl->req.ctx = ctx;
+ pthread_mutex_lock(&ctx_impl->mutex);
+
} else {
lnk = nxt_queue_first(&ctx_impl->free_req);
nxt_queue_remove(lnk);
@@ -1050,6 +1075,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
return req_impl;
@@ -1068,12 +1095,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->response = NULL;
req->response_buf = NULL;
- if (req_impl->process != NULL) {
- nxt_unit_process_use(req->ctx, req_impl->process, -1);
-
- req_impl->process = NULL;
- }
-
if (req_impl->websocket) {
nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
@@ -1088,10 +1109,24 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
nxt_unit_mmap_buf_free(req_impl->incoming_buf);
}
+ /*
+ * Process release should go after buffers release to guarantee mmap
+ * existence.
+ */
+ if (req_impl->process != NULL) {
+ nxt_unit_process_use(req->ctx, req_impl->process, -1);
+
+ req_impl->process = NULL;
+ }
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
nxt_queue_remove(&req_impl->link);
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
req_impl->state = NXT_UNIT_RS_RELEASED;
}
@@ -1120,11 +1155,13 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ pthread_mutex_lock(&ctx_impl->mutex);
+
if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
if (nxt_slow_path(ws_impl == NULL)) {
- nxt_unit_warn(ctx, "websocket frame allocation failed");
-
return NULL;
}
@@ -1132,6 +1169,8 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
lnk = nxt_queue_first(&ctx_impl->free_ws);
nxt_queue_remove(lnk);
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
}
@@ -1160,7 +1199,11 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
ws_impl->retain_buf = NULL;
}
+ pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
+
nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
+
+ pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
}
@@ -1635,6 +1678,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
if (nxt_slow_path(mmap_buf == NULL)) {
+ nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
+
return NULL;
}
@@ -1688,16 +1733,22 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ pthread_mutex_lock(&ctx_impl->mutex);
+
if (ctx_impl->free_buf == NULL) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
if (nxt_slow_path(mmap_buf == NULL)) {
- nxt_unit_warn(ctx, "failed to allocate buf");
+ return NULL;
}
} else {
mmap_buf = ctx_impl->free_buf;
nxt_unit_mmap_buf_remove(mmap_buf);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
}
mmap_buf->ctx_impl = ctx_impl;
@@ -1711,7 +1762,11 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
{
nxt_unit_mmap_buf_remove(mmap_buf);
+ pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
+
nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
+
+ pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
}
@@ -3298,7 +3353,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
close(fd);
- nxt_unit_ctx_init(lib, new_ctx, data);
+ rc = nxt_unit_ctx_init(lib, new_ctx, data);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ lib->callbacks.remove_port(ctx, &new_port_id);
+
+ free(new_ctx);
+
+ return NULL;
+ }
new_ctx->read_port_id = new_port_id;
@@ -3350,6 +3412,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
} nxt_queue_loop;
+ pthread_mutex_destroy(&ctx_impl->mutex);
+
nxt_queue_remove(&ctx_impl->link);
if (ctx_impl != &lib->main_ctx) {