diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:20:53 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:20:53 +0300 |
commit | 82c0304ab8f4fb2d406fe709639bee535b45d888 (patch) | |
tree | fcb6ecc27ce6ee620fba9cd3e55cc0e5942d3011 /src/nxt_router.c | |
parent | c93d2c76235ae6823ee59b1072338258897ae2d7 (diff) | |
download | unit-82c0304ab8f4fb2d406fe709639bee535b45d888.tar.gz unit-82c0304ab8f4fb2d406fe709639bee535b45d888.tar.bz2 |
Using port rpc in router->master start worker request.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 248 |
1 files changed, 97 insertions, 151 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index ecce72ac..40da942a 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -24,10 +24,8 @@ typedef struct nxt_req_app_link_s nxt_req_app_link_t; typedef struct nxt_start_worker_s nxt_start_worker_t; struct nxt_start_worker_s { - uint32_t stream; nxt_app_t *app; nxt_req_app_link_t *ra; - nxt_mp_t *mem_pool; nxt_work_t work; }; @@ -120,11 +118,6 @@ static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id); static void nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data); -static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, - nxt_start_worker_t *sw); -static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task, - nxt_router_t *router, uint32_t id); - static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); @@ -174,28 +167,22 @@ nxt_router_start(nxt_task_t *task, void *data) static nxt_start_worker_t * -nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app, - nxt_req_app_link_t *ra) +nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) { nxt_port_t *master_port; nxt_runtime_t *rt; nxt_start_worker_t *sw; - sw = nxt_mp_retain(mp, sizeof(nxt_start_worker_t)); + sw = nxt_zalloc(sizeof(nxt_start_worker_t)); if (nxt_slow_path(sw == NULL)) { return NULL; } - nxt_memzero(sw, sizeof(nxt_start_worker_t)); - - sw->stream = nxt_random(&task->thread->random); - sw->mem_pool = mp; - sw->app = app; sw->ra = ra; - nxt_debug(task, "sw #%uxD create, request #%uD, app '%V' %p", sw->stream, + nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw, ra->req_id, &app->name, app); rt = task->thread->runtime; @@ -208,7 +195,7 @@ nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app, sw->work.next = NULL; if (task->thread->engine != master_port->engine) { - nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream, + nxt_debug(task, "sw %p post send to master engine %p", sw, master_port->engine); nxt_event_engine_post(master_port->engine, &sw->work); @@ -221,30 +208,12 @@ nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app, } -static void -nxt_router_sw_release(nxt_task_t *task, void *obj, void *data) +nxt_inline void +nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) { - nxt_event_engine_t *engine; - nxt_start_worker_t *sw; - - sw = obj; - engine = data; - - if (task->thread->engine != engine) { - sw->work.handler = nxt_router_sw_release; - sw->work.task = &engine->task; - sw->work.next = NULL; - - nxt_debug(task, "sw #%uxD post release to %p", sw->stream, engine); - - nxt_event_engine_post(engine, &sw->work); + nxt_debug(task, "sw %p release", sw); - return; - } - - nxt_debug(task, "sw #%uxD release", sw->stream); - - nxt_mp_release(sw->mem_pool, sw); + nxt_free(sw); } @@ -321,30 +290,17 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_start_worker_t *sw; - nxt_port_new_port_handler(task, msg); - if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + if (msg->port_msg.stream == 0) { return; } - sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream); - - if (nxt_fast_path(sw != NULL)) { - msg->new_port->app = sw->app; - - nxt_assert(sw->app->pending_workers != 0); - - sw->app->pending_workers--; - sw->app->workers++; - - nxt_debug(task, "sw #%uxD got port %p", sw->stream, msg->new_port); - - nxt_router_app_release_port(task, msg->new_port, sw->app); - - nxt_router_sw_release(task, sw, sw->work.data); + if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } + + nxt_port_rpc_handler(task, msg); } @@ -392,6 +348,21 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +void +nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_remove_pid_handler(task, msg); + + if (msg->port_msg.stream == 0) { + return; + } + + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; + + nxt_port_rpc_handler(task, msg); +} + + static nxt_router_temp_conf_t * nxt_router_temp_conf(nxt_task_t *task) { @@ -1401,13 +1372,15 @@ static void nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_port_handler_t nxt_router_app_port_handlers[] = { - NULL, - nxt_port_new_port_handler, - nxt_port_change_log_file_handler, - nxt_port_mmap_handler, + NULL, /* NXT_PORT_MSG_QUIT */ + NULL, /* NXT_PORT_MSG_NEW_PORT */ + NULL, /* NXT_PORT_MSG_CHANGE_FILE */ + /* TODO: remove mmap_handler from app ports */ + nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ nxt_router_app_data_handler, - nxt_port_remove_pid_handler, - NULL, + NULL, /* NXT_PORT_MSG_REMOVE_PID */ + NULL, /* NXT_PORT_MSG_READY */ + NULL, /* NXT_PORT_MSG_START_WORKER */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -1938,12 +1911,54 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, static void +nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_start_worker_t *sw; + + sw = data; + + nxt_assert(sw != NULL); + nxt_assert(sw->app->pending_workers != 0); + + msg->new_port->app = sw->app; + + sw->app->pending_workers--; + sw->app->workers++; + + nxt_debug(task, "sw %p got port %p", sw, msg->new_port); + + nxt_router_app_release_port(task, msg->new_port, sw->app); + + nxt_router_sw_release(task, sw); +} + + +static void +nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_start_worker_t *sw; + + sw = data; + + nxt_assert(sw != NULL); + nxt_assert(sw->app->pending_workers != 0); + + sw->app->pending_workers--; + + nxt_debug(task, "sw %p error, failed to start app '%V'", sw, &sw->app->name); + + nxt_router_sw_release(task, sw); +} + + +static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) { size_t size; + uint32_t stream; nxt_buf_t *b; nxt_app_t *app; - nxt_port_t *port; + nxt_port_t *master_port, *router_port; nxt_runtime_t *rt; nxt_start_worker_t *sw; @@ -1955,43 +1970,45 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) if (app->workers + app->pending_workers >= app->max_workers) { nxt_debug(task, "app '%V' %p %uD/%uD running/penging workers, " "post sw #%uxD release to %p", &app->name, app, - app->workers, app->pending_workers, sw->stream, - sw->work.data); + "sw %p release", &app->name, app, + app->workers, app->pending_workers, sw); - nxt_router_sw_release(task, sw, sw->work.data); + nxt_router_sw_release(task, sw); return; } app->pending_workers++; - nxt_debug(task, "sw #%uxD send", sw->stream); - - nxt_router_sw_add(task, nxt_router, sw); + nxt_debug(task, "sw %p send", sw); rt = task->thread->runtime; - port = rt->port_by_type[NXT_PROCESS_MASTER]; + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; size = app->name.length + 1 + app->conf.length; - b = nxt_buf_mem_alloc(port->mem_pool, size, 0); + b = nxt_buf_mem_alloc(master_port->mem_pool, size, 0); nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); - nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, - sw->stream, 0, b); + stream = nxt_port_rpc_register_handler(task, router_port, + nxt_router_sw_ready, + nxt_router_sw_error, + master_port->pid, sw); + + nxt_port_socket_write(task, master_port, NXT_PORT_MSG_START_WORKER, -1, + stream, router_port->id, b); } static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) { - nxt_port_t *master_port; - nxt_runtime_t *rt; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app, app->live, app->workers, app->pending_workers, @@ -2015,10 +2032,7 @@ nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); - rt = task->thread->runtime; - master_port = rt->port_by_type[NXT_PROCESS_MASTER]; - - nxt_router_sw_create(task, master_port->mem_pool, app, ra); + nxt_router_sw_create(task, app, ra); } return 0; @@ -2217,7 +2231,7 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_OK; } - sw = nxt_router_sw_create(task, c->mem_pool, app, ra); + sw = nxt_router_sw_create(task, app, ra); if (nxt_slow_path(sw == NULL)) { nxt_router_gen_error(task, c, 500, @@ -2627,71 +2641,3 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) return nxt_value_at(nxt_msec_t, joint->socket_conf, data); } - - -static nxt_int_t -nxt_sw_test(nxt_lvlhsh_query_t *lhq, void *data) -{ - return NXT_OK; -} - - -static const nxt_lvlhsh_proto_t lvlhsh_sw_proto nxt_aligned(64) = { - NXT_LVLHSH_DEFAULT, - nxt_sw_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, -}; - - -static void -nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, - nxt_start_worker_t *sw) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&sw->stream, sizeof(sw->stream)); - lhq.key.length = sizeof(sw->stream); - lhq.key.start = (u_char *) &sw->stream; - lhq.proto = &lvlhsh_sw_proto; - lhq.replace = 0; - lhq.value = sw; - lhq.pool = task->thread->runtime->mem_pool; - - switch (nxt_lvlhsh_insert(&router->start_workers, &lhq)) { - - case NXT_OK: - break; - - default: - nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw add failed", - sw->stream); - break; - } -} - - -static nxt_start_worker_t * -nxt_router_sw_find_remove(nxt_task_t *task, nxt_router_t *router, uint32_t id) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&id, sizeof(id)); - lhq.key.length = sizeof(id); - lhq.key.start = (u_char *) &id; - lhq.proto = &lvlhsh_sw_proto; - lhq.pool = task->thread->runtime->mem_pool; - - switch (nxt_lvlhsh_delete(&router->start_workers, &lhq)) { - - case NXT_OK: - return lhq.value; - - default: - nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw remove failed", - id); - break; - } - - return NULL; -} |