summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r--src/nxt_port.c129
1 files changed, 116 insertions, 13 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c
index d7f42012..3f7dc411 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -27,13 +27,15 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
nxt_assert(port->pair[0] == -1);
nxt_assert(port->pair[1] == -1);
- nxt_assert(port->app_stream == 0);
+ nxt_assert(port->use_count == 0);
nxt_assert(port->app_link.next == NULL);
nxt_assert(nxt_queue_is_empty(&port->messages));
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
+ nxt_thread_mutex_destroy(&port->write_mutex);
+
nxt_mp_free(mp, port);
}
@@ -58,10 +60,12 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
port->pid = pid;
port->type = type;
port->mem_pool = mp;
+ port->use_count = 1;
nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
nxt_queue_init(&port->messages);
+ nxt_thread_mutex_create(&port->write_mutex);
} else {
nxt_mp_destroy(mp);
@@ -73,11 +77,11 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
}
-nxt_bool_t
-nxt_port_release(nxt_port_t *port)
+void
+nxt_port_close(nxt_task_t *task, nxt_port_t *port)
{
- nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid,
- port->id, port->type);
+ nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
+ port->id, port->type);
if (port->pair[0] != -1) {
nxt_fd_close(port->pair[0]);
@@ -87,21 +91,31 @@ nxt_port_release(nxt_port_t *port)
if (port->pair[1] != -1) {
nxt_fd_close(port->pair[1]);
port->pair[1] = -1;
- }
- if (port->type == NXT_PROCESS_WORKER) {
- if (nxt_router_app_remove_port(port) == 0) {
- return 0;
+ if (port->app != NULL) {
+ nxt_router_app_port_close(task, port);
}
}
+}
+
+
+static void
+nxt_port_release(nxt_task_t *task, nxt_port_t *port)
+{
+ nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
+ port->id, port->type);
+
+ if (port->app != NULL) {
+ nxt_router_app_use(task, port->app, -1);
+
+ port->app = NULL;
+ }
if (port->link.next != NULL) {
nxt_process_port_remove(port);
}
nxt_mp_release(port->mem_pool, NULL);
-
- return 1;
}
@@ -263,7 +277,9 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->socket.task = task;
- nxt_runtime_port_add(rt, port);
+ nxt_runtime_port_add(task, port);
+
+ nxt_port_use(task, port, -1);
nxt_port_write_enable(task, port);
@@ -434,7 +450,7 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
process = nxt_runtime_process_find(rt, pid);
if (process) {
- nxt_runtime_process_remove(rt, process);
+ nxt_runtime_process_remove(task, process);
}
}
@@ -444,3 +460,90 @@ nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_debug(task, "port empty handler");
}
+
+
+typedef struct {
+ nxt_work_t work;
+ nxt_port_t *port;
+ nxt_port_post_handler_t handler;
+} nxt_port_work_t;
+
+
+static void
+nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_port_t *port;
+ nxt_port_work_t *pw;
+ nxt_port_post_handler_t handler;
+
+ pw = obj;
+ port = pw->port;
+ handler = pw->handler;
+
+ nxt_free(pw);
+
+ handler(task, port, data);
+
+ nxt_port_use(task, port, -1);
+}
+
+
+nxt_int_t
+nxt_port_post(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_post_handler_t handler, void *data)
+{
+ nxt_port_work_t *pw;
+
+ if (task->thread->engine == port->engine) {
+ handler(task, port, data);
+
+ return NXT_OK;
+ }
+
+ pw = nxt_zalloc(sizeof(nxt_port_work_t));
+
+ if (nxt_slow_path(pw == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_atomic_fetch_add(&port->use_count, 1);
+
+ pw->work.handler = nxt_port_post_handler;
+ pw->work.task = &port->engine->task;
+ pw->work.obj = pw;
+ pw->work.data = data;
+
+ pw->port = port;
+ pw->handler = handler;
+
+ nxt_event_engine_post(port->engine, &pw->work);
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ /* no op */
+}
+
+
+void
+nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
+{
+ int c;
+
+ c = nxt_atomic_fetch_add(&port->use_count, i);
+
+ if (i < 0 && c == -i) {
+
+ if (task->thread->engine == port->engine) {
+ nxt_port_release(task, port);
+
+ return;
+ }
+
+ nxt_port_post(task, port, nxt_port_release_handler, NULL);
+ }
+}