summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-08-02 13:20:53 +0300
committerMax Romanov <max.romanov@nginx.com>2017-08-02 13:20:53 +0300
commit82c0304ab8f4fb2d406fe709639bee535b45d888 (patch)
treefcb6ecc27ce6ee620fba9cd3e55cc0e5942d3011 /src/nxt_router.c
parentc93d2c76235ae6823ee59b1072338258897ae2d7 (diff)
downloadunit-82c0304ab8f4fb2d406fe709639bee535b45d888.tar.gz
unit-82c0304ab8f4fb2d406fe709639bee535b45d888.tar.bz2
Using port rpc in router->master start worker request.
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c248
1 files changed, 97 insertions, 151 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index ecce72ac..40da942a 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -24,10 +24,8 @@ typedef struct nxt_req_app_link_s nxt_req_app_link_t;
typedef struct nxt_start_worker_s nxt_start_worker_t;
struct nxt_start_worker_s {
- uint32_t stream;
nxt_app_t *app;
nxt_req_app_link_t *ra;
- nxt_mp_t *mem_pool;
nxt_work_t work;
};
@@ -120,11 +118,6 @@ static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id);
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
void *data);
-static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
- nxt_start_worker_t *sw);
-static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task,
- nxt_router_t *router, uint32_t id);
-
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
@@ -174,28 +167,22 @@ nxt_router_start(nxt_task_t *task, void *data)
static nxt_start_worker_t *
-nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app,
- nxt_req_app_link_t *ra)
+nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
{
nxt_port_t *master_port;
nxt_runtime_t *rt;
nxt_start_worker_t *sw;
- sw = nxt_mp_retain(mp, sizeof(nxt_start_worker_t));
+ sw = nxt_zalloc(sizeof(nxt_start_worker_t));
if (nxt_slow_path(sw == NULL)) {
return NULL;
}
- nxt_memzero(sw, sizeof(nxt_start_worker_t));
-
- sw->stream = nxt_random(&task->thread->random);
- sw->mem_pool = mp;
-
sw->app = app;
sw->ra = ra;
- nxt_debug(task, "sw #%uxD create, request #%uD, app '%V' %p", sw->stream,
+ nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw,
ra->req_id, &app->name, app);
rt = task->thread->runtime;
@@ -208,7 +195,7 @@ nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app,
sw->work.next = NULL;
if (task->thread->engine != master_port->engine) {
- nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream,
+ nxt_debug(task, "sw %p post send to master engine %p", sw,
master_port->engine);
nxt_event_engine_post(master_port->engine, &sw->work);
@@ -221,30 +208,12 @@ nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app,
}
-static void
-nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
+nxt_inline void
+nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
{
- nxt_event_engine_t *engine;
- nxt_start_worker_t *sw;
-
- sw = obj;
- engine = data;
-
- if (task->thread->engine != engine) {
- sw->work.handler = nxt_router_sw_release;
- sw->work.task = &engine->task;
- sw->work.next = NULL;
-
- nxt_debug(task, "sw #%uxD post release to %p", sw->stream, engine);
-
- nxt_event_engine_post(engine, &sw->work);
+ nxt_debug(task, "sw %p release", sw);
- return;
- }
-
- nxt_debug(task, "sw #%uxD release", sw->stream);
-
- nxt_mp_release(sw->mem_pool, sw);
+ nxt_free(sw);
}
@@ -321,30 +290,17 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- nxt_start_worker_t *sw;
-
nxt_port_new_port_handler(task, msg);
- if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
+ if (msg->port_msg.stream == 0) {
return;
}
- sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream);
-
- if (nxt_fast_path(sw != NULL)) {
- msg->new_port->app = sw->app;
-
- nxt_assert(sw->app->pending_workers != 0);
-
- sw->app->pending_workers--;
- sw->app->workers++;
-
- nxt_debug(task, "sw #%uxD got port %p", sw->stream, msg->new_port);
-
- nxt_router_app_release_port(task, msg->new_port, sw->app);
-
- nxt_router_sw_release(task, sw, sw->work.data);
+ if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
+ msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
}
+
+ nxt_port_rpc_handler(task, msg);
}
@@ -392,6 +348,21 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
+void
+nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_port_remove_pid_handler(task, msg);
+
+ if (msg->port_msg.stream == 0) {
+ return;
+ }
+
+ msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
+
+ nxt_port_rpc_handler(task, msg);
+}
+
+
static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t *task)
{
@@ -1401,13 +1372,15 @@ static void
nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
static nxt_port_handler_t nxt_router_app_port_handlers[] = {
- NULL,
- nxt_port_new_port_handler,
- nxt_port_change_log_file_handler,
- nxt_port_mmap_handler,
+ NULL, /* NXT_PORT_MSG_QUIT */
+ NULL, /* NXT_PORT_MSG_NEW_PORT */
+ NULL, /* NXT_PORT_MSG_CHANGE_FILE */
+ /* TODO: remove mmap_handler from app ports */
+ nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */
nxt_router_app_data_handler,
- nxt_port_remove_pid_handler,
- NULL,
+ NULL, /* NXT_PORT_MSG_REMOVE_PID */
+ NULL, /* NXT_PORT_MSG_READY */
+ NULL, /* NXT_PORT_MSG_START_WORKER */
nxt_port_rpc_handler,
nxt_port_rpc_handler,
};
@@ -1938,12 +1911,54 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
static void
+nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+{
+ nxt_start_worker_t *sw;
+
+ sw = data;
+
+ nxt_assert(sw != NULL);
+ nxt_assert(sw->app->pending_workers != 0);
+
+ msg->new_port->app = sw->app;
+
+ sw->app->pending_workers--;
+ sw->app->workers++;
+
+ nxt_debug(task, "sw %p got port %p", sw, msg->new_port);
+
+ nxt_router_app_release_port(task, msg->new_port, sw->app);
+
+ nxt_router_sw_release(task, sw);
+}
+
+
+static void
+nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+{
+ nxt_start_worker_t *sw;
+
+ sw = data;
+
+ nxt_assert(sw != NULL);
+ nxt_assert(sw->app->pending_workers != 0);
+
+ sw->app->pending_workers--;
+
+ nxt_debug(task, "sw %p error, failed to start app '%V'", sw, &sw->app->name);
+
+ nxt_router_sw_release(task, sw);
+}
+
+
+static void
nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
{
size_t size;
+ uint32_t stream;
nxt_buf_t *b;
nxt_app_t *app;
- nxt_port_t *port;
+ nxt_port_t *master_port, *router_port;
nxt_runtime_t *rt;
nxt_start_worker_t *sw;
@@ -1955,43 +1970,45 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
if (app->workers + app->pending_workers >= app->max_workers) {
nxt_debug(task, "app '%V' %p %uD/%uD running/penging workers, "
"post sw #%uxD release to %p", &app->name, app,
- app->workers, app->pending_workers, sw->stream,
- sw->work.data);
+ "sw %p release", &app->name, app,
+ app->workers, app->pending_workers, sw);
- nxt_router_sw_release(task, sw, sw->work.data);
+ nxt_router_sw_release(task, sw);
return;
}
app->pending_workers++;
- nxt_debug(task, "sw #%uxD send", sw->stream);
-
- nxt_router_sw_add(task, nxt_router, sw);
+ nxt_debug(task, "sw %p send", sw);
rt = task->thread->runtime;
- port = rt->port_by_type[NXT_PROCESS_MASTER];
+ master_port = rt->port_by_type[NXT_PROCESS_MASTER];
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
size = app->name.length + 1 + app->conf.length;
- b = nxt_buf_mem_alloc(port->mem_pool, size, 0);
+ b = nxt_buf_mem_alloc(master_port->mem_pool, size, 0);
nxt_buf_cpystr(b, &app->name);
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
- nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1,
- sw->stream, 0, b);
+ stream = nxt_port_rpc_register_handler(task, router_port,
+ nxt_router_sw_ready,
+ nxt_router_sw_error,
+ master_port->pid, sw);
+
+ nxt_port_socket_write(task, master_port, NXT_PORT_MSG_START_WORKER, -1,
+ stream, router_port->id, b);
}
static nxt_bool_t
nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
{
- nxt_port_t *master_port;
- nxt_runtime_t *rt;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_queue_link_t *lnk;
+ nxt_req_app_link_t *ra;
nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
app->live, app->workers, app->pending_workers,
@@ -2015,10 +2032,7 @@ nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
- rt = task->thread->runtime;
- master_port = rt->port_by_type[NXT_PROCESS_MASTER];
-
- nxt_router_sw_create(task, master_port->mem_pool, app, ra);
+ nxt_router_sw_create(task, app, ra);
}
return 0;
@@ -2217,7 +2231,7 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
return NXT_OK;
}
- sw = nxt_router_sw_create(task, c->mem_pool, app, ra);
+ sw = nxt_router_sw_create(task, app, ra);
if (nxt_slow_path(sw == NULL)) {
nxt_router_gen_error(task, c, 500,
@@ -2627,71 +2641,3 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
}
-
-
-static nxt_int_t
-nxt_sw_test(nxt_lvlhsh_query_t *lhq, void *data)
-{
- return NXT_OK;
-}
-
-
-static const nxt_lvlhsh_proto_t lvlhsh_sw_proto nxt_aligned(64) = {
- NXT_LVLHSH_DEFAULT,
- nxt_sw_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
-};
-
-
-static void
-nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
- nxt_start_worker_t *sw)
-{
- nxt_lvlhsh_query_t lhq;
-
- lhq.key_hash = nxt_murmur_hash2(&sw->stream, sizeof(sw->stream));
- lhq.key.length = sizeof(sw->stream);
- lhq.key.start = (u_char *) &sw->stream;
- lhq.proto = &lvlhsh_sw_proto;
- lhq.replace = 0;
- lhq.value = sw;
- lhq.pool = task->thread->runtime->mem_pool;
-
- switch (nxt_lvlhsh_insert(&router->start_workers, &lhq)) {
-
- case NXT_OK:
- break;
-
- default:
- nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw add failed",
- sw->stream);
- break;
- }
-}
-
-
-static nxt_start_worker_t *
-nxt_router_sw_find_remove(nxt_task_t *task, nxt_router_t *router, uint32_t id)
-{
- nxt_lvlhsh_query_t lhq;
-
- lhq.key_hash = nxt_murmur_hash2(&id, sizeof(id));
- lhq.key.length = sizeof(id);
- lhq.key.start = (u_char *) &id;
- lhq.proto = &lvlhsh_sw_proto;
- lhq.pool = task->thread->runtime->mem_pool;
-
- switch (nxt_lvlhsh_delete(&router->start_workers, &lhq)) {
-
- case NXT_OK:
- return lhq.value;
-
- default:
- nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw remove failed",
- id);
- break;
- }
-
- return NULL;
-}