summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-04-10 16:21:58 +0300
committerMax Romanov <max.romanov@nginx.com>2020-04-10 16:21:58 +0300
commit58cc13ab291cac5b13462006e3feb780178ef5f3 (patch)
tree0c52174f577f308b097922a1f5c8f14f86169355
parentc7f5c1c6641838006088524c2122eae8f9c30431 (diff)
downloadunit-58cc13ab291cac5b13462006e3feb780178ef5f3.tar.gz
unit-58cc13ab291cac5b13462006e3feb780178ef5f3.tar.bz2
Resolving a racing condition while adding ports on the app's side.
An earlier attempt (ad6265786871) to resolve this condition on the router's side added a new issue: the app could get a request before acquiring a port.
-rw-r--r--go/port.go6
-rw-r--r--src/nxt_process.c17
-rw-r--r--src/nxt_process.h4
-rw-r--r--src/nxt_router.c6
-rw-r--r--src/nxt_unit.c28
5 files changed, 48 insertions, 13 deletions
diff --git a/go/port.go b/go/port.go
index 72d33d31..59a13f8b 100644
--- a/go/port.go
+++ b/go/port.go
@@ -50,7 +50,11 @@ func add_port(p *port) {
port_registry_.m = make(map[port_key]*port)
}
- port_registry_.m[p.key] = p
+ old := port_registry_.m[p.key]
+
+ if old == nil {
+ port_registry_.m[p.key] = p
+ }
port_registry_.Unlock()
}
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 4179844b..f5959edf 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -591,6 +591,17 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
void
+nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
+{
+ nxt_thread_mutex_lock(&process->cp_mutex);
+
+ nxt_port_hash_add(&process->connected_ports, port);
+
+ nxt_thread_mutex_unlock(&process->cp_mutex);
+}
+
+
+void
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
{
nxt_thread_mutex_lock(&process->cp_mutex);
@@ -602,7 +613,7 @@ nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
nxt_port_t *
-nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *port)
+nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port)
{
nxt_port_t *res;
@@ -610,10 +621,6 @@ nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *port)
res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id);
- if (nxt_slow_path(res == NULL)) {
- nxt_port_hash_add(&process->connected_ports, port);
- }
-
nxt_thread_mutex_unlock(&process->cp_mutex);
return res;
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 0c51adfb..3f7155c8 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -105,10 +105,12 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
+void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
+
void nxt_process_connected_port_remove(nxt_process_t *process,
nxt_port_t *port);
-nxt_port_t *nxt_process_connected_port_find_add(nxt_process_t *process,
+nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
nxt_port_t *port);
void nxt_worker_process_quit_handler(nxt_task_t *task,
diff --git a/src/nxt_router.c b/src/nxt_router.c
index a70b03d1..2f4ea698 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -4811,7 +4811,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
apr_action = NXT_APR_REQUEST_FAILED;
- c_port = nxt_process_connected_port_find_add(port->process, reply_port);
+ c_port = nxt_process_connected_port_find(port->process, reply_port);
if (nxt_slow_path(c_port != reply_port)) {
res = nxt_port_send_port(task, port, reply_port, 0);
@@ -4820,10 +4820,10 @@ nxt_router_app_prepare_request(nxt_task_t *task,
nxt_request_app_link_error(task, port->app, req_app_link,
"Failed to send reply port to application");
- nxt_process_connected_port_remove(port->process, reply_port);
-
goto release_port;
}
+
+ nxt_process_connected_port_add(port->process, reply_port);
}
buf = nxt_router_prepare_msg(task, req_app_link->request, port,
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index c2e7f198..67244420 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -4282,16 +4282,38 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
- nxt_unit_port_impl_t *new_port;
+ nxt_unit_port_impl_t *new_port, *old_port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ pthread_mutex_lock(&lib->mutex);
+
+ old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
+
+ if (nxt_slow_path(old_port != NULL)) {
+ nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
+ port->id.pid, port->id.id,
+ port->in_fd, port->out_fd);
+
+ if (port->in_fd != -1) {
+ close(port->in_fd);
+ port->in_fd = -1;
+ }
+
+ if (port->out_fd != -1) {
+ close(port->out_fd);
+ port->out_fd = -1;
+ }
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ return NXT_UNIT_OK;
+ }
+
nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
- pthread_mutex_lock(&lib->mutex);
-
process = nxt_unit_process_get(ctx, port->id.pid);
if (nxt_slow_path(process == NULL)) {
rc = NXT_UNIT_ERROR;