diff options
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r-- | src/nxt_unit.c | 96 |
1 files changed, 80 insertions, 16 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 28a0de20..9ccd1fd9 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -31,7 +31,7 @@ typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); -static void nxt_unit_ctx_init(nxt_unit_impl_t *lib, +static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); @@ -204,6 +204,8 @@ struct nxt_unit_websocket_frame_impl_s { struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; + pthread_mutex_t mutex; + nxt_unit_port_id_t read_port_id; int read_port_fd; @@ -331,6 +333,7 @@ nxt_unit_init(nxt_unit_init_t *init) } } + lib->pid = read_port.id.pid; ctx = &lib->main_ctx.ctx; rc = lib->callbacks.add_port(ctx, &ready_port); @@ -396,13 +399,15 @@ nxt_unit_create(nxt_unit_init_t *init) lib->processes.slot = NULL; lib->ports.slot = NULL; - lib->pid = getpid(); lib->log_fd = STDERR_FILENO; lib->online = 1; nxt_queue_init(&lib->contexts); - nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); + rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } cb = &lib->callbacks; @@ -446,15 +451,24 @@ fail: } -static void +static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data) { + int rc; + ctx_impl->ctx.data = data; ctx_impl->ctx.unit = &lib->unit; nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + rc = pthread_mutex_init(&ctx_impl->mutex, NULL); + if (nxt_slow_path(rc != 0)) { + nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); + + return NXT_UNIT_ERROR; + } + nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); @@ -470,6 +484,8 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->read_port_fd = -1; ctx_impl->requests.slot = 0; + + return NXT_UNIT_OK; } @@ -962,6 +978,11 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } else { b = nxt_unit_mmap_buf_get(ctx); if (nxt_slow_path(b == NULL)) { + nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", + req_impl->stream); + + nxt_unit_websocket_frame_release(&ws_impl->ws); + return NXT_UNIT_ERROR; } @@ -1029,18 +1050,22 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + pthread_mutex_lock(&ctx_impl->mutex); + if (nxt_queue_is_empty(&ctx_impl->free_req)) { + pthread_mutex_unlock(&ctx_impl->mutex); + req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) + lib->request_data_size); if (nxt_slow_path(req_impl == NULL)) { - nxt_unit_warn(ctx, "request info allocation failed"); - return NULL; } req_impl->req.unit = ctx->unit; req_impl->req.ctx = ctx; + pthread_mutex_lock(&ctx_impl->mutex); + } else { lnk = nxt_queue_first(&ctx_impl->free_req); nxt_queue_remove(lnk); @@ -1050,6 +1075,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); + pthread_mutex_unlock(&ctx_impl->mutex); + req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; return req_impl; @@ -1068,12 +1095,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - if (req_impl->process != NULL) { - nxt_unit_process_use(req->ctx, req_impl->process, -1); - - req_impl->process = NULL; - } - if (req_impl->websocket) { nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); @@ -1088,10 +1109,24 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_unit_mmap_buf_free(req_impl->incoming_buf); } + /* + * Process release should go after buffers release to guarantee mmap + * existence. + */ + if (req_impl->process != NULL) { + nxt_unit_process_use(req->ctx, req_impl->process, -1); + + req_impl->process = NULL; + } + + pthread_mutex_lock(&ctx_impl->mutex); + nxt_queue_remove(&req_impl->link); nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); + pthread_mutex_unlock(&ctx_impl->mutex); + req_impl->state = NXT_UNIT_RS_RELEASED; } @@ -1120,11 +1155,13 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + pthread_mutex_lock(&ctx_impl->mutex); + if (nxt_queue_is_empty(&ctx_impl->free_ws)) { + pthread_mutex_unlock(&ctx_impl->mutex); + ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); if (nxt_slow_path(ws_impl == NULL)) { - nxt_unit_warn(ctx, "websocket frame allocation failed"); - return NULL; } @@ -1132,6 +1169,8 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) lnk = nxt_queue_first(&ctx_impl->free_ws); nxt_queue_remove(lnk); + pthread_mutex_unlock(&ctx_impl->mutex); + ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); } @@ -1160,7 +1199,11 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) ws_impl->retain_buf = NULL; } + pthread_mutex_lock(&ws_impl->ctx_impl->mutex); + nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); + + pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); } @@ -1635,6 +1678,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) mmap_buf = nxt_unit_mmap_buf_get(req->ctx); if (nxt_slow_path(mmap_buf == NULL)) { + nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); + return NULL; } @@ -1688,16 +1733,22 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + pthread_mutex_lock(&ctx_impl->mutex); + if (ctx_impl->free_buf == NULL) { + pthread_mutex_unlock(&ctx_impl->mutex); + mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); if (nxt_slow_path(mmap_buf == NULL)) { - nxt_unit_warn(ctx, "failed to allocate buf"); + return NULL; } } else { mmap_buf = ctx_impl->free_buf; nxt_unit_mmap_buf_remove(mmap_buf); + + pthread_mutex_unlock(&ctx_impl->mutex); } mmap_buf->ctx_impl = ctx_impl; @@ -1711,7 +1762,11 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) { nxt_unit_mmap_buf_remove(mmap_buf); + pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); + nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); + + pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); } @@ -3298,7 +3353,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) close(fd); - nxt_unit_ctx_init(lib, new_ctx, data); + rc = nxt_unit_ctx_init(lib, new_ctx, data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + lib->callbacks.remove_port(ctx, &new_port_id); + + free(new_ctx); + + return NULL; + } new_ctx->read_port_id = new_port_id; @@ -3350,6 +3412,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; + pthread_mutex_destroy(&ctx_impl->mutex); + nxt_queue_remove(&ctx_impl->link); if (ctx_impl != &lib->main_ctx) { |