diff options
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r-- | src/nxt_port.c | 129 |
1 files changed, 116 insertions, 13 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index d7f42012..3f7dc411 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -27,13 +27,15 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) nxt_assert(port->pair[0] == -1); nxt_assert(port->pair[1] == -1); - nxt_assert(port->app_stream == 0); + nxt_assert(port->use_count == 0); nxt_assert(port->app_link.next == NULL); nxt_assert(nxt_queue_is_empty(&port->messages)); nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); + nxt_thread_mutex_destroy(&port->write_mutex); + nxt_mp_free(mp, port); } @@ -58,10 +60,12 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, port->pid = pid; port->type = type; port->mem_pool = mp; + port->use_count = 1; nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); nxt_queue_init(&port->messages); + nxt_thread_mutex_create(&port->write_mutex); } else { nxt_mp_destroy(mp); @@ -73,11 +77,11 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, } -nxt_bool_t -nxt_port_release(nxt_port_t *port) +void +nxt_port_close(nxt_task_t *task, nxt_port_t *port) { - nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid, - port->id, port->type); + nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, + port->id, port->type); if (port->pair[0] != -1) { nxt_fd_close(port->pair[0]); @@ -87,21 +91,31 @@ nxt_port_release(nxt_port_t *port) if (port->pair[1] != -1) { nxt_fd_close(port->pair[1]); port->pair[1] = -1; - } - if (port->type == NXT_PROCESS_WORKER) { - if (nxt_router_app_remove_port(port) == 0) { - return 0; + if (port->app != NULL) { + nxt_router_app_port_close(task, port); } } +} + + +static void +nxt_port_release(nxt_task_t *task, nxt_port_t *port) +{ + nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, + port->id, port->type); + + if (port->app != NULL) { + nxt_router_app_use(task, port->app, -1); + + port->app = NULL; + } if (port->link.next != NULL) { nxt_process_port_remove(port); } nxt_mp_release(port->mem_pool, NULL); - - return 1; } @@ -263,7 +277,9 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->socket.task = task; - nxt_runtime_port_add(rt, port); + nxt_runtime_port_add(task, port); + + nxt_port_use(task, port, -1); nxt_port_write_enable(task, port); @@ -434,7 +450,7 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) process = nxt_runtime_process_find(rt, pid); if (process) { - nxt_runtime_process_remove(rt, process); + nxt_runtime_process_remove(task, process); } } @@ -444,3 +460,90 @@ nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "port empty handler"); } + + +typedef struct { + nxt_work_t work; + nxt_port_t *port; + nxt_port_post_handler_t handler; +} nxt_port_work_t; + + +static void +nxt_port_post_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_port_t *port; + nxt_port_work_t *pw; + nxt_port_post_handler_t handler; + + pw = obj; + port = pw->port; + handler = pw->handler; + + nxt_free(pw); + + handler(task, port, data); + + nxt_port_use(task, port, -1); +} + + +nxt_int_t +nxt_port_post(nxt_task_t *task, nxt_port_t *port, + nxt_port_post_handler_t handler, void *data) +{ + nxt_port_work_t *pw; + + if (task->thread->engine == port->engine) { + handler(task, port, data); + + return NXT_OK; + } + + pw = nxt_zalloc(sizeof(nxt_port_work_t)); + + if (nxt_slow_path(pw == NULL)) { + return NXT_ERROR; + } + + nxt_atomic_fetch_add(&port->use_count, 1); + + pw->work.handler = nxt_port_post_handler; + pw->work.task = &port->engine->task; + pw->work.obj = pw; + pw->work.data = data; + + pw->port = port; + pw->handler = handler; + + nxt_event_engine_post(port->engine, &pw->work); + + return NXT_OK; +} + + +static void +nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data) +{ + /* no op */ +} + + +void +nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i) +{ + int c; + + c = nxt_atomic_fetch_add(&port->use_count, i); + + if (i < 0 && c == -i) { + + if (task->thread->engine == port->engine) { + nxt_port_release(task, port); + + return; + } + + nxt_port_post(task, port, nxt_port_release_handler, NULL); + } +} |