summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-11-18 22:33:53 +0300
committerMax Romanov <max.romanov@nginx.com>2020-11-18 22:33:53 +0300
commit0ec69aa46e3577ef44060b9dd8576faab4a863ce (patch)
tree05710eb57b916092d787706a25504b129ae7b8cb /src/nxt_unit.c
parent8340ca0b9c7ad4109033ccb028f87cc1b73396bc (diff)
downloadunit-0ec69aa46e3577ef44060b9dd8576faab4a863ce.tar.gz
unit-0ec69aa46e3577ef44060b9dd8576faab4a863ce.tar.bz2
Libunit: fixing racing condition for port add / state change.
The issue only occurred in Go applications because "port_send" is overloaded only in Go. To reproduce it, send multiple concurrent requests to the application after it has initialised. The warning message "[unit] [go] port NNN:dd not found" is the first visible aspect of the issue; the second and more valuable one is a closed connection, an error response, or a hanging response to some requests. When the application starts, it is unaware of the router's worker thread ports, so it requests the ports from the router after receiving requests from the corresponding router worker threads. When multiple requests are processed simultaneously, the router port may be required by several requests, so request processing starts only after the application receives the required port information. The port should be added to the Go port repository after its 'ready' flag is updated. Otherwise, Unit may start processing some requests and use the port before it is in the repository. The issue was introduced in changeset 78836321a126.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c126
1 files changed, 87 insertions, 39 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 564fd094..69cae8bb 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -147,6 +147,8 @@ nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, void *queue);
+static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
+ nxt_queue_t *awaiting_req);
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
nxt_unit_port_id_t *port_id);
static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
@@ -5340,14 +5342,12 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
{
- int rc;
- nxt_queue_t awaiting_req;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *old_port;
- nxt_unit_process_t *process;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_port_impl_t *new_port, *old_port_impl;
- nxt_unit_request_info_impl_t *req_impl;
+ int rc, ready;
+ nxt_queue_t awaiting_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
+ nxt_unit_process_t *process;
+ nxt_unit_port_impl_t *new_port, *old_port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -5396,46 +5396,45 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
old_port_impl->queue = queue;
}
- if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
- nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
- nxt_queue_init(&old_port_impl->awaiting_req);
- }
-
- old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
+ ready = (port->in_fd != -1 || port->out_fd != -1);
- pthread_mutex_unlock(&lib->mutex);
+ /*
+ * Port can be market as 'ready' only after callbacks.add_port() call.
+ * Otherwise, request may try to use the port before callback.
+ */
+ if (lib->callbacks.add_port == NULL && ready) {
+ old_port_impl->ready = ready;
- if (lib->callbacks.add_port != NULL
- && (port->in_fd != -1 || port->out_fd != -1))
- {
- lib->callbacks.add_port(ctx, old_port);
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
}
- nxt_queue_each(req_impl, &awaiting_req,
- nxt_unit_request_info_impl_t, port_wait_link)
- {
- nxt_queue_remove(&req_impl->port_wait_link);
-
- ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
- ctx);
+ pthread_mutex_unlock(&lib->mutex);
- pthread_mutex_lock(&ctx_impl->mutex);
+ if (lib->callbacks.add_port != NULL && ready) {
+ lib->callbacks.add_port(ctx, old_port);
- nxt_queue_insert_tail(&ctx_impl->ready_req,
- &req_impl->port_wait_link);
+ pthread_mutex_lock(&lib->mutex);
- pthread_mutex_unlock(&ctx_impl->mutex);
+ old_port_impl->ready = ready;
- nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
- nxt_unit_awake_ctx(ctx, ctx_impl);
+ pthread_mutex_unlock(&lib->mutex);
+ }
- } nxt_queue_loop;
+ nxt_unit_process_awaiting_req(ctx, &awaiting_req);
return old_port;
}
new_port = NULL;
+ ready = 0;
nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
port->id.pid, port->id.id,
@@ -5478,13 +5477,21 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
new_port->use_count = 2;
new_port->process = process;
- new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
new_port->queue = queue;
new_port->from_socket = 0;
new_port->socket_rbuf = NULL;
nxt_queue_init(&new_port->awaiting_req);
+ ready = (port->in_fd != -1 || port->out_fd != -1);
+
+ if (lib->callbacks.add_port == NULL) {
+ new_port->ready = ready;
+
+ } else {
+ new_port->ready = 0;
+ }
+
process = NULL;
unlock:
@@ -5495,14 +5502,55 @@ unlock:
nxt_unit_process_release(process);
}
- if (lib->callbacks.add_port != NULL
- && new_port != NULL
- && (port->in_fd != -1 || port->out_fd != -1))
- {
+ if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
lib->callbacks.add_port(ctx, &new_port->port);
+
+ nxt_queue_init(&awaiting_req);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ new_port->ready = 1;
+
+ if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
+ nxt_queue_init(&new_port->awaiting_req);
+ }
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_process_awaiting_req(ctx, &awaiting_req);
}
- return &new_port->port;
+ return (new_port == NULL) ? NULL : &new_port->port;
+}
+
+
+static void
+nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ nxt_queue_each(req_impl, awaiting_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ nxt_queue_remove(&req_impl->port_wait_link);
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
+ ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_tail(&ctx_impl->ready_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ nxt_unit_awake_ctx(ctx, ctx_impl);
+
+ } nxt_queue_loop;
}