diff options
-rw-r--r-- | src/nxt_controller.c | 2 | ||||
-rw-r--r-- | src/nxt_port.c | 4 | ||||
-rw-r--r-- | src/nxt_port.h | 6 | ||||
-rw-r--r-- | src/nxt_router.c | 104 |
4 files changed, 30 insertions, 86 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 4a0d065b..d4d8023e 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -189,7 +189,7 @@ nxt_controller_process_new_port_handler(nxt_task_t *task, nxt_port_new_port_handler(task, msg); - if (msg->new_port->type != NXT_PROCESS_ROUTER) { + if (msg->u.new_port->type != NXT_PROCESS_ROUTER) { return; } diff --git a/src/nxt_port.c b/src/nxt_port.c index 3f7dc411..948e4de6 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -283,7 +283,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_write_enable(task, port); - msg->new_port = port; + msg->u.new_port = port; } @@ -441,6 +441,8 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + msg->u.removed_pid = pid; + nxt_debug(task, "port remove pid %PI handler", pid); rt = task->thread->runtime; diff --git a/src/nxt_port.h b/src/nxt_port.h index fdb5a561..09ad6367 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -127,7 +127,11 @@ struct nxt_port_recv_msg_s { nxt_port_t *port; nxt_port_msg_t port_msg; size_t size; - nxt_port_t *new_port; + union { + nxt_port_t *new_port; + nxt_pid_t removed_pid; + void *data; + } u; }; typedef struct nxt_app_s nxt_app_t; diff --git a/src/nxt_router.c b/src/nxt_router.c index 33b8d65e..b3da0eaa 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -62,23 +62,11 @@ typedef struct { } nxt_socket_rpc_t; -typedef struct { - nxt_mp_t *mem_pool; - nxt_port_recv_msg_t msg; - nxt_work_t work; -} nxt_remove_pid_msg_t; - - static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, const char* str); -static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, - void *data); - static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -572,7 +560,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + if (msg->u.new_port == NULL || + msg->u.new_port->type != NXT_PROCESS_WORKER) + { msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } @@ -621,13 +611,24 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +static void +nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) +{ + union { + nxt_pid_t removed_pid; + void *data; + } u; + + u.data = data; + + nxt_port_rpc_remove_peer(task, port, u.removed_pid); +} + + void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_mp_t *mp; - nxt_buf_t *buf; - nxt_event_engine_t *engine; - nxt_remove_pid_msg_t *rp; + nxt_event_engine_t *engine; nxt_port_remove_pid_handler(task, msg); @@ -635,82 +636,19 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - mp = nxt_mp_create(1024, 128, 256, 32); - - buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0); - buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos, - nxt_buf_used_size(msg->buf)); - nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) { - rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t)); - - rp->mem_pool = mp; - - rp->msg.fd = msg->fd; - rp->msg.buf = buf; - rp->msg.port = engine->port; - rp->msg.port_msg = msg->port_msg; - rp->msg.size = msg->size; - rp->msg.new_port = NULL; - - rp->work.handler = nxt_router_worker_remove_pid_handler; - rp->work.task = &engine->task; - rp->work.obj = rp; - rp->work.data = task->thread->engine; - rp->work.next = NULL; - - nxt_event_engine_post(engine, &rp->work); + nxt_port_post(task, engine->port, nxt_router_worker_remove_pid, + msg->u.data); } nxt_queue_loop; - nxt_mp_release(mp, NULL); - msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; nxt_port_rpc_handler(task, msg); } -static void -nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_pid_t pid; - nxt_buf_t *buf; - nxt_event_engine_t *engine; - nxt_remove_pid_msg_t *rp; - - rp = obj; - - buf = rp->msg.buf; - - nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); - - nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); - - nxt_port_rpc_remove_peer(task, rp->msg.port, pid); - - engine = rp->work.data; - - rp->work.handler = nxt_router_worker_remove_pid_done; - rp->work.task = &engine->task; - rp->work.next = NULL; - - nxt_event_engine_post(engine, &rp->work); -} - - -static void -nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data) -{ - nxt_remove_pid_msg_t *rp; - - rp = obj; - - nxt_mp_release(rp->mem_pool, rp); -} - - static nxt_router_temp_conf_t * nxt_router_temp_conf(nxt_task_t *task) { @@ -2528,7 +2466,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_port_t *port; app = data; - port = msg->new_port; + port = msg->u.new_port; nxt_assert(app != NULL); nxt_assert(port != NULL); |