diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 104 |
1 files changed, 21 insertions, 83 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 33b8d65e..b3da0eaa 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -62,23 +62,11 @@ typedef struct { } nxt_socket_rpc_t; -typedef struct { - nxt_mp_t *mem_pool; - nxt_port_recv_msg_t msg; - nxt_work_t work; -} nxt_remove_pid_msg_t; - - static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, const char* str); -static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, - void *data); - static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -572,7 +560,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + if (msg->u.new_port == NULL || + msg->u.new_port->type != NXT_PROCESS_WORKER) + { msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } @@ -621,13 +611,24 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +static void +nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) +{ + union { + nxt_pid_t removed_pid; + void *data; + } u; + + u.data = data; + + nxt_port_rpc_remove_peer(task, port, u.removed_pid); +} + + void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_mp_t *mp; - nxt_buf_t *buf; - nxt_event_engine_t *engine; - nxt_remove_pid_msg_t *rp; + nxt_event_engine_t *engine; nxt_port_remove_pid_handler(task, msg); @@ -635,82 +636,19 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - mp = nxt_mp_create(1024, 128, 256, 32); - - buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0); - buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos, - nxt_buf_used_size(msg->buf)); - nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) { - rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t)); - - rp->mem_pool = mp; - - rp->msg.fd = msg->fd; - rp->msg.buf = buf; - rp->msg.port = engine->port; - rp->msg.port_msg = msg->port_msg; - rp->msg.size = msg->size; - rp->msg.new_port = NULL; - - rp->work.handler = nxt_router_worker_remove_pid_handler; - rp->work.task = &engine->task; - rp->work.obj = rp; - rp->work.data = task->thread->engine; - rp->work.next = NULL; - - nxt_event_engine_post(engine, &rp->work); + nxt_port_post(task, engine->port, nxt_router_worker_remove_pid, + msg->u.data); } nxt_queue_loop; - nxt_mp_release(mp, NULL); - msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; nxt_port_rpc_handler(task, msg); } -static void -nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_pid_t pid; - nxt_buf_t *buf; - nxt_event_engine_t *engine; - nxt_remove_pid_msg_t *rp; - - rp = obj; - - buf = rp->msg.buf; - - nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); - - nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); - - nxt_port_rpc_remove_peer(task, rp->msg.port, pid); - - engine = rp->work.data; - - rp->work.handler = nxt_router_worker_remove_pid_done; - rp->work.task = &engine->task; - rp->work.next = NULL; - - nxt_event_engine_post(engine, &rp->work); -} - - -static void -nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data) -{ - nxt_remove_pid_msg_t *rp; - - rp = obj; - - nxt_mp_release(rp->mem_pool, rp); -} - - static nxt_router_temp_conf_t * nxt_router_temp_conf(nxt_task_t *task) { @@ -2528,7 +2466,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_port_t *port; app = data; - port = msg->new_port; + port = msg->u.new_port; nxt_assert(app != NULL); nxt_assert(port != NULL); |