summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_controller.c2
-rw-r--r--src/nxt_port.c4
-rw-r--r--src/nxt_port.h6
-rw-r--r--src/nxt_router.c104
4 files changed, 30 insertions, 86 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 4a0d065b..d4d8023e 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -189,7 +189,7 @@ nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_new_port_handler(task, msg);
- if (msg->new_port->type != NXT_PROCESS_ROUTER) {
+ if (msg->u.new_port->type != NXT_PROCESS_ROUTER) {
return;
}
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 3f7dc411..948e4de6 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -283,7 +283,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_write_enable(task, port);
- msg->new_port = port;
+ msg->u.new_port = port;
}
@@ -441,6 +441,8 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
+ msg->u.removed_pid = pid;
+
nxt_debug(task, "port remove pid %PI handler", pid);
rt = task->thread->runtime;
diff --git a/src/nxt_port.h b/src/nxt_port.h
index fdb5a561..09ad6367 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -127,7 +127,11 @@ struct nxt_port_recv_msg_s {
nxt_port_t *port;
nxt_port_msg_t port_msg;
size_t size;
- nxt_port_t *new_port;
+ union {
+ nxt_port_t *new_port;
+ nxt_pid_t removed_pid;
+ void *data;
+ } u;
};
typedef struct nxt_app_s nxt_app_t;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 33b8d65e..b3da0eaa 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -62,23 +62,11 @@ typedef struct {
} nxt_socket_rpc_t;
-typedef struct {
- nxt_mp_t *mem_pool;
- nxt_port_recv_msg_t msg;
- nxt_work_t work;
-} nxt_remove_pid_msg_t;
-
-
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
int code, const char* str);
-static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
- void *data);
-
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conf_ready(nxt_task_t *task,
@@ -572,7 +560,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
+ if (msg->u.new_port == NULL ||
+ msg->u.new_port->type != NXT_PROCESS_WORKER)
+ {
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
}
@@ -621,13 +611,24 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
+static void
+nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ union {
+ nxt_pid_t removed_pid;
+ void *data;
+ } u;
+
+ u.data = data;
+
+ nxt_port_rpc_remove_peer(task, port, u.removed_pid);
+}
+
+
void
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- nxt_mp_t *mp;
- nxt_buf_t *buf;
- nxt_event_engine_t *engine;
- nxt_remove_pid_msg_t *rp;
+ nxt_event_engine_t *engine;
nxt_port_remove_pid_handler(task, msg);
@@ -635,82 +636,19 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- mp = nxt_mp_create(1024, 128, 256, 32);
-
- buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0);
- buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos,
- nxt_buf_used_size(msg->buf));
-
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
{
- rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t));
-
- rp->mem_pool = mp;
-
- rp->msg.fd = msg->fd;
- rp->msg.buf = buf;
- rp->msg.port = engine->port;
- rp->msg.port_msg = msg->port_msg;
- rp->msg.size = msg->size;
- rp->msg.new_port = NULL;
-
- rp->work.handler = nxt_router_worker_remove_pid_handler;
- rp->work.task = &engine->task;
- rp->work.obj = rp;
- rp->work.data = task->thread->engine;
- rp->work.next = NULL;
-
- nxt_event_engine_post(engine, &rp->work);
+ nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
+ msg->u.data);
}
nxt_queue_loop;
- nxt_mp_release(mp, NULL);
-
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
nxt_port_rpc_handler(task, msg);
}
-static void
-nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_pid_t pid;
- nxt_buf_t *buf;
- nxt_event_engine_t *engine;
- nxt_remove_pid_msg_t *rp;
-
- rp = obj;
-
- buf = rp->msg.buf;
-
- nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
-
- nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
-
- nxt_port_rpc_remove_peer(task, rp->msg.port, pid);
-
- engine = rp->work.data;
-
- rp->work.handler = nxt_router_worker_remove_pid_done;
- rp->work.task = &engine->task;
- rp->work.next = NULL;
-
- nxt_event_engine_post(engine, &rp->work);
-}
-
-
-static void
-nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data)
-{
- nxt_remove_pid_msg_t *rp;
-
- rp = obj;
-
- nxt_mp_release(rp->mem_pool, rp);
-}
-
-
static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t *task)
{
@@ -2528,7 +2466,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_port_t *port;
app = data;
- port = msg->new_port;
+ port = msg->u.new_port;
nxt_assert(app != NULL);
nxt_assert(port != NULL);