diff options
-rw-r--r-- | go/port.go | 6 | ||||
-rw-r--r-- | src/nxt_process.c | 17 | ||||
-rw-r--r-- | src/nxt_process.h | 4 | ||||
-rw-r--r-- | src/nxt_router.c | 6 | ||||
-rw-r--r-- | src/nxt_unit.c | 28 |
5 files changed, 48 insertions, 13 deletions
@@ -50,7 +50,11 @@ func add_port(p *port) { port_registry_.m = make(map[port_key]*port) } - port_registry_.m[p.key] = p + old := port_registry_.m[p.key] + + if old == nil { + port_registry_.m[p.key] = p + } port_registry_.Unlock() } diff --git a/src/nxt_process.c b/src/nxt_process.c index 4179844b..f5959edf 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -591,6 +591,17 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process) void +nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) +{ + nxt_thread_mutex_lock(&process->cp_mutex); + + nxt_port_hash_add(&process->connected_ports, port); + + nxt_thread_mutex_unlock(&process->cp_mutex); +} + + +void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) { nxt_thread_mutex_lock(&process->cp_mutex); @@ -602,7 +613,7 @@ nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) nxt_port_t * -nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *port) +nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port) { nxt_port_t *res; @@ -610,10 +621,6 @@ nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *port) res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id); - if (nxt_slow_path(res == NULL)) { - nxt_port_hash_add(&process->connected_ports, port); - } - nxt_thread_mutex_unlock(&process->cp_mutex); return res; diff --git a/src/nxt_process.h b/src/nxt_process.h index 0c51adfb..3f7155c8 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -105,10 +105,12 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); +void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); + void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port); -nxt_port_t *nxt_process_connected_port_find_add(nxt_process_t *process, +nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port); void nxt_worker_process_quit_handler(nxt_task_t *task, diff --git a/src/nxt_router.c b/src/nxt_router.c index a70b03d1..2f4ea698 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -4811,7 +4811,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, apr_action = NXT_APR_REQUEST_FAILED; - c_port = nxt_process_connected_port_find_add(port->process, reply_port); + c_port = nxt_process_connected_port_find(port->process, reply_port); if (nxt_slow_path(c_port != reply_port)) { res = nxt_port_send_port(task, port, reply_port, 0); @@ -4820,10 +4820,10 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_request_app_link_error(task, port->app, req_app_link, "Failed to send reply port to application"); - nxt_process_connected_port_remove(port->process, reply_port); - goto release_port; } + + nxt_process_connected_port_add(port->process, reply_port); } buf = nxt_router_prepare_msg(task, req_app_link->request, port, diff --git a/src/nxt_unit.c b/src/nxt_unit.c index c2e7f198..67244420 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4282,16 +4282,38 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int rc; nxt_unit_impl_t *lib; nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port; + nxt_unit_port_impl_t *new_port, *old_port; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + pthread_mutex_lock(&lib->mutex); + + old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); + + if (nxt_slow_path(old_port != NULL)) { + nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", + port->id.pid, port->id.id, + port->in_fd, port->out_fd); + + if (port->in_fd != -1) { + close(port->in_fd); + port->in_fd = -1; + } + + if (port->out_fd != -1) { + close(port->out_fd); + port->out_fd = -1; + } + + pthread_mutex_unlock(&lib->mutex); + + return NXT_UNIT_OK; + } + nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", port->id.pid, port->id.id, port->in_fd, port->out_fd); - pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_get(ctx, port->id.pid); if (nxt_slow_path(process == NULL)) { rc = NXT_UNIT_ERROR; |