diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-11-18 22:33:53 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-11-18 22:33:53 +0300 |
commit | 0ec69aa46e3577ef44060b9dd8576faab4a863ce (patch) | |
tree | 05710eb57b916092d787706a25504b129ae7b8cb /src/nxt_unit.c | |
parent | 8340ca0b9c7ad4109033ccb028f87cc1b73396bc (diff) | |
download | unit-0ec69aa46e3577ef44060b9dd8576faab4a863ce.tar.gz unit-0ec69aa46e3577ef44060b9dd8576faab4a863ce.tar.bz2 |
Libunit: fixing racing condition for port add / state change.
The issue only occurred in Go applications because "port_send" is overloaded
only in Go. To reproduce it, send multiple concurrent requests to the
application after it has initialised. The warning message "[unit] [go] port
NNN:dd not found" is the first visible aspect of the issue; the second and more
valuable one is a closed connection, an error response, or a hanging response to
some requests.
When the application starts, it is unaware of the router's worker thread ports,
so it requests the ports from the router after receiving requests from the
corresponding router worker threads. When multiple requests are processed
simultaneously, the router port may be required by several requests, so request
processing starts only after the application receives the required port
information. The port should be added to the Go port repository after its
'ready' flag is updated. Otherwise, Unit may start processing some requests and
use the port before it is in the repository.
The issue was introduced in changeset 78836321a126.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r-- | src/nxt_unit.c | 126 |
1 files changed, 87 insertions, 39 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 564fd094..69cae8bb 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -147,6 +147,8 @@ nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue); +static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, + nxt_queue_t *awaiting_req); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, @@ -5340,14 +5342,12 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) { - int rc; - nxt_queue_t awaiting_req; - nxt_unit_impl_t *lib; - nxt_unit_port_t *old_port; - nxt_unit_process_t *process; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_port_impl_t *new_port, *old_port_impl; - nxt_unit_request_info_impl_t *req_impl; + int rc, ready; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_port_impl_t *new_port, *old_port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -5396,46 +5396,45 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) old_port_impl->queue = queue; } - if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { - nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); - nxt_queue_init(&old_port_impl->awaiting_req); - } - - old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + ready = (port->in_fd != -1 || port->out_fd != -1); - pthread_mutex_unlock(&lib->mutex); + /* + * Port can be market as 'ready' only after callbacks.add_port() call. + * Otherwise, request may try to use the port before callback. + */ + if (lib->callbacks.add_port == NULL && ready) { + old_port_impl->ready = ready; - if (lib->callbacks.add_port != NULL - && (port->in_fd != -1 || port->out_fd != -1)) - { - lib->callbacks.add_port(ctx, old_port); + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } } - nxt_queue_each(req_impl, &awaiting_req, - nxt_unit_request_info_impl_t, port_wait_link) - { - nxt_queue_remove(&req_impl->port_wait_link); - - ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, - ctx); + pthread_mutex_unlock(&lib->mutex); - pthread_mutex_lock(&ctx_impl->mutex); + if (lib->callbacks.add_port != NULL && ready) { + lib->callbacks.add_port(ctx, old_port); - nxt_queue_insert_tail(&ctx_impl->ready_req, - &req_impl->port_wait_link); + pthread_mutex_lock(&lib->mutex); - pthread_mutex_unlock(&ctx_impl->mutex); + old_port_impl->ready = ready; - nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } - nxt_unit_awake_ctx(ctx, ctx_impl); + pthread_mutex_unlock(&lib->mutex); + } - } nxt_queue_loop; + nxt_unit_process_awaiting_req(ctx, &awaiting_req); return old_port; } new_port = NULL; + ready = 0; nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", port->id.pid, port->id.id, @@ -5478,13 +5477,21 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) new_port->use_count = 2; new_port->process = process; - new_port->ready = (port->in_fd != -1 || port->out_fd != -1); new_port->queue = queue; new_port->from_socket = 0; new_port->socket_rbuf = NULL; nxt_queue_init(&new_port->awaiting_req); + ready = (port->in_fd != -1 || port->out_fd != -1); + + if (lib->callbacks.add_port == NULL) { + new_port->ready = ready; + + } else { + new_port->ready = 0; + } + process = NULL; unlock: @@ -5495,14 +5502,55 @@ unlock: nxt_unit_process_release(process); } - if (lib->callbacks.add_port != NULL - && new_port != NULL - && (port->in_fd != -1 || port->out_fd != -1)) - { + if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { lib->callbacks.add_port(ctx, &new_port->port); + + nxt_queue_init(&awaiting_req); + + pthread_mutex_lock(&lib->mutex); + + new_port->ready = 1; + + if (!nxt_queue_is_empty(&new_port->awaiting_req)) { + nxt_queue_add(&awaiting_req, &new_port->awaiting_req); + nxt_queue_init(&new_port->awaiting_req); + } + + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_process_awaiting_req(ctx, &awaiting_req); } - return &new_port->port; + return (new_port == NULL) ? NULL : &new_port->port; +} + + +static void +nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) +{ + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_each(req_impl, awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + nxt_unit_awake_ctx(ctx, ctx_impl); + + } nxt_queue_loop; } |