summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nxt_unit.c126
1 files changed, 87 insertions, 39 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 564fd094..69cae8bb 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -147,6 +147,8 @@ nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, void *queue);
+static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
+ nxt_queue_t *awaiting_req);
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
nxt_unit_port_id_t *port_id);
static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
@@ -5340,14 +5342,12 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
{
- int rc;
- nxt_queue_t awaiting_req;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *old_port;
- nxt_unit_process_t *process;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_port_impl_t *new_port, *old_port_impl;
- nxt_unit_request_info_impl_t *req_impl;
+ int rc, ready;
+ nxt_queue_t awaiting_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
+ nxt_unit_process_t *process;
+ nxt_unit_port_impl_t *new_port, *old_port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -5396,46 +5396,45 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
old_port_impl->queue = queue;
}
- if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
- nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
- nxt_queue_init(&old_port_impl->awaiting_req);
- }
-
- old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
+ ready = (port->in_fd != -1 || port->out_fd != -1);
- pthread_mutex_unlock(&lib->mutex);
+ /*
+ * Port can be market as 'ready' only after callbacks.add_port() call.
+ * Otherwise, request may try to use the port before callback.
+ */
+ if (lib->callbacks.add_port == NULL && ready) {
+ old_port_impl->ready = ready;
- if (lib->callbacks.add_port != NULL
- && (port->in_fd != -1 || port->out_fd != -1))
- {
- lib->callbacks.add_port(ctx, old_port);
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
}
- nxt_queue_each(req_impl, &awaiting_req,
- nxt_unit_request_info_impl_t, port_wait_link)
- {
- nxt_queue_remove(&req_impl->port_wait_link);
-
- ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
- ctx);
+ pthread_mutex_unlock(&lib->mutex);
- pthread_mutex_lock(&ctx_impl->mutex);
+ if (lib->callbacks.add_port != NULL && ready) {
+ lib->callbacks.add_port(ctx, old_port);
- nxt_queue_insert_tail(&ctx_impl->ready_req,
- &req_impl->port_wait_link);
+ pthread_mutex_lock(&lib->mutex);
- pthread_mutex_unlock(&ctx_impl->mutex);
+ old_port_impl->ready = ready;
- nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
- nxt_unit_awake_ctx(ctx, ctx_impl);
+ pthread_mutex_unlock(&lib->mutex);
+ }
- } nxt_queue_loop;
+ nxt_unit_process_awaiting_req(ctx, &awaiting_req);
return old_port;
}
new_port = NULL;
+ ready = 0;
nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
port->id.pid, port->id.id,
@@ -5478,13 +5477,21 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
new_port->use_count = 2;
new_port->process = process;
- new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
new_port->queue = queue;
new_port->from_socket = 0;
new_port->socket_rbuf = NULL;
nxt_queue_init(&new_port->awaiting_req);
+ ready = (port->in_fd != -1 || port->out_fd != -1);
+
+ if (lib->callbacks.add_port == NULL) {
+ new_port->ready = ready;
+
+ } else {
+ new_port->ready = 0;
+ }
+
process = NULL;
unlock:
@@ -5495,14 +5502,55 @@ unlock:
nxt_unit_process_release(process);
}
- if (lib->callbacks.add_port != NULL
- && new_port != NULL
- && (port->in_fd != -1 || port->out_fd != -1))
- {
+ if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
lib->callbacks.add_port(ctx, &new_port->port);
+
+ nxt_queue_init(&awaiting_req);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ new_port->ready = 1;
+
+ if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
+ nxt_queue_init(&new_port->awaiting_req);
+ }
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_process_awaiting_req(ctx, &awaiting_req);
}
- return &new_port->port;
+ return (new_port == NULL) ? NULL : &new_port->port;
+}
+
+
+static void
+nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ nxt_queue_each(req_impl, awaiting_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ nxt_queue_remove(&req_impl->port_wait_link);
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
+ ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_tail(&ctx_impl->ready_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ nxt_unit_awake_ctx(ctx, ctx_impl);
+
+ } nxt_queue_loop;
}