diff options
-rw-r--r-- | src/nxt_master_process.c | 49 | ||||
-rw-r--r-- | src/nxt_port.c | 8 | ||||
-rw-r--r-- | src/nxt_port.h | 3 | ||||
-rw-r--r-- | src/nxt_router.c | 248 | ||||
-rw-r--r-- | src/nxt_router.h | 3 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 14 |
6 files changed, 148 insertions, 177 deletions
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 9e32785f..c35bb6b1 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -136,24 +136,32 @@ static nxt_conf_map_t nxt_common_app_conf[] = { static void nxt_port_master_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - u_char *start; - size_t dump_size; - nxt_mp_t *mp; - nxt_int_t ret; - nxt_buf_t *b; - nxt_conf_value_t *conf; + nxt_buf_t *b; + + b = msg->buf; + + nxt_debug(task, "master data: %*s", b->mem.free - b->mem.pos, b->mem.pos); + + b->mem.pos = b->mem.free; +} + + +static void +nxt_port_master_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + u_char *start; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_buf_t *b; + nxt_conf_value_t *conf; nxt_common_app_conf_t app_conf; static nxt_str_t nobody = nxt_string("nobody"); b = msg->buf; - dump_size = b->mem.free - b->mem.pos; - if (dump_size > 300) { - dump_size = 300; - } - - nxt_debug(task, "master data: %*s", dump_size, b->mem.pos); + nxt_debug(task, "master start worker: %*s", b->mem.free - b->mem.pos, + b->mem.pos); mp = nxt_mp_create(1024, 128, 256, 32); @@ -193,13 +201,14 @@ nxt_port_master_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_port_handler_t nxt_master_process_port_handlers[] = { - NULL, - nxt_port_new_port_handler, - NULL, - nxt_port_mmap_handler, + NULL, /* NXT_PORT_MSG_QUIT */ + NULL, /* NXT_PORT_MSG_NEW_PORT */ + NULL, /* NXT_PORT_MSG_CHANGE_FILE */ + NULL, /* NXT_PORT_MSG_MMAP */ nxt_port_master_data_handler, - NULL, + NULL, /* NXT_PORT_MSG_REMOVE_PID */ nxt_port_ready_handler, + nxt_port_master_start_worker_handler, nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -634,6 +643,7 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { + nxt_buf_t *buf; nxt_port_t *port; nxt_runtime_t *rt; nxt_process_t *process; @@ -657,8 +667,11 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) port = nxt_process_port_first(process); + buf = nxt_buf_mem_alloc(port->mem_pool, sizeof(pid), 0); + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, - -1, pid, 0, NULL); + -1, init->stream, 0, buf); } nxt_runtime_process_loop; } diff --git a/src/nxt_port.c b/src/nxt_port.c index 27b5ddbe..681fec39 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -413,14 +413,20 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_buf_t *buf; nxt_pid_t pid; nxt_runtime_t *rt; nxt_process_t *process; nxt_debug(task, "port remove pid handler"); + buf = msg->buf; + + nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); + + nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + rt = task->thread->runtime; - pid = msg->port_msg.stream; nxt_port_rpc_remove_peer(task, msg->port, pid); diff --git a/src/nxt_port.h b/src/nxt_port.h index 1cbaaef1..8a5c3390 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -21,6 +21,7 @@ typedef enum { _NXT_PORT_MSG_DATA, _NXT_PORT_MSG_REMOVE_PID, _NXT_PORT_MSG_READY, + _NXT_PORT_MSG_START_WORKER, _NXT_PORT_MSG_RPC_READY, _NXT_PORT_MSG_RPC_ERROR, @@ -35,6 +36,8 @@ typedef enum { NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_START_WORKER = _NXT_PORT_MSG_START_WORKER | + NXT_PORT_MSG_LAST, NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST, NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST, diff --git a/src/nxt_router.c b/src/nxt_router.c index ecce72ac..40da942a 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -24,10 +24,8 @@ typedef struct nxt_req_app_link_s nxt_req_app_link_t; typedef struct nxt_start_worker_s nxt_start_worker_t; struct nxt_start_worker_s { - uint32_t stream; nxt_app_t *app; nxt_req_app_link_t *ra; - nxt_mp_t *mem_pool; nxt_work_t work; }; @@ -120,11 +118,6 @@ static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id); static void nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data); -static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, - nxt_start_worker_t *sw); -static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task, - nxt_router_t *router, uint32_t id); - static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); @@ -174,28 +167,22 @@ nxt_router_start(nxt_task_t *task, void *data) static nxt_start_worker_t * -nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app, - nxt_req_app_link_t *ra) +nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) { nxt_port_t *master_port; nxt_runtime_t *rt; nxt_start_worker_t *sw; - sw = nxt_mp_retain(mp, sizeof(nxt_start_worker_t)); + sw = nxt_zalloc(sizeof(nxt_start_worker_t)); if (nxt_slow_path(sw == NULL)) { return NULL; } - nxt_memzero(sw, sizeof(nxt_start_worker_t)); - - sw->stream = nxt_random(&task->thread->random); - sw->mem_pool = mp; - sw->app = app; sw->ra = ra; - nxt_debug(task, "sw #%uxD create, request #%uD, app '%V' %p", sw->stream, + nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw, ra->req_id, &app->name, app); rt = task->thread->runtime; @@ -208,7 +195,7 @@ nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app, sw->work.next = NULL; if (task->thread->engine != master_port->engine) { - nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream, + nxt_debug(task, "sw %p post send to master engine %p", sw, master_port->engine); nxt_event_engine_post(master_port->engine, &sw->work); @@ -221,30 +208,12 @@ nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app, } -static void -nxt_router_sw_release(nxt_task_t *task, void *obj, void *data) +nxt_inline void +nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) { - nxt_event_engine_t *engine; - nxt_start_worker_t *sw; - - sw = obj; - engine = data; - - if (task->thread->engine != engine) { - sw->work.handler = nxt_router_sw_release; - sw->work.task = &engine->task; - sw->work.next = NULL; - - nxt_debug(task, "sw #%uxD post release to %p", sw->stream, engine); - - nxt_event_engine_post(engine, &sw->work); + nxt_debug(task, "sw %p release", sw); - return; - } - - nxt_debug(task, "sw #%uxD release", sw->stream); - - nxt_mp_release(sw->mem_pool, sw); + nxt_free(sw); } @@ -321,30 +290,17 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_start_worker_t *sw; - nxt_port_new_port_handler(task, msg); - if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + if (msg->port_msg.stream == 0) { return; } - sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream); - - if (nxt_fast_path(sw != NULL)) { - msg->new_port->app = sw->app; - - nxt_assert(sw->app->pending_workers != 0); - - sw->app->pending_workers--; - sw->app->workers++; - - nxt_debug(task, "sw #%uxD got port %p", sw->stream, msg->new_port); - - nxt_router_app_release_port(task, msg->new_port, sw->app); - - nxt_router_sw_release(task, sw, sw->work.data); + if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } + + nxt_port_rpc_handler(task, msg); } @@ -392,6 +348,21 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +void +nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_remove_pid_handler(task, msg); + + if (msg->port_msg.stream == 0) { + return; + } + + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; + + nxt_port_rpc_handler(task, msg); +} + + static nxt_router_temp_conf_t * nxt_router_temp_conf(nxt_task_t *task) { @@ -1401,13 +1372,15 @@ static void nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_port_handler_t nxt_router_app_port_handlers[] = { - NULL, - nxt_port_new_port_handler, - nxt_port_change_log_file_handler, - nxt_port_mmap_handler, + NULL, /* NXT_PORT_MSG_QUIT */ + NULL, /* NXT_PORT_MSG_NEW_PORT */ + NULL, /* NXT_PORT_MSG_CHANGE_FILE */ + /* TODO: remove mmap_handler from app ports */ + nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ nxt_router_app_data_handler, - nxt_port_remove_pid_handler, - NULL, + NULL, /* NXT_PORT_MSG_REMOVE_PID */ + NULL, /* NXT_PORT_MSG_READY */ + NULL, /* NXT_PORT_MSG_START_WORKER */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -1938,12 +1911,54 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, static void +nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_start_worker_t *sw; + + sw = data; + + nxt_assert(sw != NULL); + nxt_assert(sw->app->pending_workers != 0); + + msg->new_port->app = sw->app; + + sw->app->pending_workers--; + sw->app->workers++; + + nxt_debug(task, "sw %p got port %p", sw, msg->new_port); + + nxt_router_app_release_port(task, msg->new_port, sw->app); + + nxt_router_sw_release(task, sw); +} + + +static void +nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_start_worker_t *sw; + + sw = data; + + nxt_assert(sw != NULL); + nxt_assert(sw->app->pending_workers != 0); + + sw->app->pending_workers--; + + nxt_debug(task, "sw %p error, failed to start app '%V'", sw, &sw->app->name); + + nxt_router_sw_release(task, sw); +} + + +static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) { size_t size; + uint32_t stream; nxt_buf_t *b; nxt_app_t *app; - nxt_port_t *port; + nxt_port_t *master_port, *router_port; nxt_runtime_t *rt; nxt_start_worker_t *sw; @@ -1955,43 +1970,45 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) if (app->workers + app->pending_workers >= app->max_workers) { nxt_debug(task, "app '%V' %p %uD/%uD running/penging workers, " "post sw #%uxD release to %p", &app->name, app, - app->workers, app->pending_workers, sw->stream, - sw->work.data); + "sw %p release", &app->name, app, + app->workers, app->pending_workers, sw); - nxt_router_sw_release(task, sw, sw->work.data); + nxt_router_sw_release(task, sw); return; } app->pending_workers++; - nxt_debug(task, "sw #%uxD send", sw->stream); - - nxt_router_sw_add(task, nxt_router, sw); + nxt_debug(task, "sw %p send", sw); rt = task->thread->runtime; - port = rt->port_by_type[NXT_PROCESS_MASTER]; + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; size = app->name.length + 1 + app->conf.length; - b = nxt_buf_mem_alloc(port->mem_pool, size, 0); + b = nxt_buf_mem_alloc(master_port->mem_pool, size, 0); nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); - nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, - sw->stream, 0, b); + stream = nxt_port_rpc_register_handler(task, router_port, + nxt_router_sw_ready, + nxt_router_sw_error, + master_port->pid, sw); + + nxt_port_socket_write(task, master_port, NXT_PORT_MSG_START_WORKER, -1, + stream, router_port->id, b); } static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) { - nxt_port_t *master_port; - nxt_runtime_t *rt; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app, app->live, app->workers, app->pending_workers, @@ -2015,10 +2032,7 @@ nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); - rt = task->thread->runtime; - master_port = rt->port_by_type[NXT_PROCESS_MASTER]; - - nxt_router_sw_create(task, master_port->mem_pool, app, ra); + nxt_router_sw_create(task, app, ra); } return 0; @@ -2217,7 +2231,7 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_OK; } - sw = nxt_router_sw_create(task, c->mem_pool, app, ra); + sw = nxt_router_sw_create(task, app, ra); if (nxt_slow_path(sw == NULL)) { nxt_router_gen_error(task, c, 500, @@ -2627,71 +2641,3 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) return nxt_value_at(nxt_msec_t, joint->socket_conf, data); } - - -static nxt_int_t -nxt_sw_test(nxt_lvlhsh_query_t *lhq, void *data) -{ - return NXT_OK; -} - - -static const nxt_lvlhsh_proto_t lvlhsh_sw_proto nxt_aligned(64) = { - NXT_LVLHSH_DEFAULT, - nxt_sw_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, -}; - - -static void -nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, - nxt_start_worker_t *sw) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&sw->stream, sizeof(sw->stream)); - lhq.key.length = sizeof(sw->stream); - lhq.key.start = (u_char *) &sw->stream; - lhq.proto = &lvlhsh_sw_proto; - lhq.replace = 0; - lhq.value = sw; - lhq.pool = task->thread->runtime->mem_pool; - - switch (nxt_lvlhsh_insert(&router->start_workers, &lhq)) { - - case NXT_OK: - break; - - default: - nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw add failed", - sw->stream); - break; - } -} - - -static nxt_start_worker_t * -nxt_router_sw_find_remove(nxt_task_t *task, nxt_router_t *router, uint32_t id) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&id, sizeof(id)); - lhq.key.length = sizeof(id); - lhq.key.start = (u_char *) &id; - lhq.proto = &lvlhsh_sw_proto; - lhq.pool = task->thread->runtime->mem_pool; - - switch (nxt_lvlhsh_delete(&router->start_workers, &lhq)) { - - case NXT_OK: - return lhq.value; - - default: - nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw remove failed", - id); - break; - } - - return NULL; -} diff --git a/src/nxt_router.h b/src/nxt_router.h index 5724c192..201b786c 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -20,8 +20,6 @@ typedef struct { nxt_queue_t sockets; /* of nxt_socket_conf_t */ nxt_queue_t apps; /* of nxt_app_t */ - - nxt_lvlhsh_t start_workers; /* stream to nxt_start_worker_t */ } nxt_router_t; @@ -124,6 +122,7 @@ typedef struct { void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); nxt_bool_t nxt_router_app_remove_port(nxt_port_t *port); diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 83c63445..181d854e 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -29,7 +29,8 @@ nxt_port_handler_t nxt_controller_process_port_handlers[] = { nxt_port_mmap_handler, nxt_port_controller_data_handler, nxt_port_remove_pid_handler, - NULL, + NULL, /* NXT_PORT_MSG_READY */ + NULL, /* NXT_PORT_MSG_START_WORKER */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -42,7 +43,8 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = { nxt_port_mmap_handler, nxt_port_data_handler, nxt_port_remove_pid_handler, - NULL, + NULL, /* NXT_PORT_MSG_READY */ + NULL, /* NXT_PORT_MSG_START_WORKER */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -55,7 +57,8 @@ nxt_port_handler_t nxt_app_process_port_handlers[] = { nxt_port_mmap_handler, nxt_port_app_data_handler, nxt_port_remove_pid_handler, - NULL, + NULL, /* NXT_PORT_MSG_READY */ + NULL, /* NXT_PORT_MSG_START_WORKER */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -67,8 +70,9 @@ nxt_port_handler_t nxt_router_process_port_handlers[] = { nxt_port_change_log_file_handler, nxt_port_mmap_handler, nxt_router_conf_data_handler, - nxt_port_remove_pid_handler, - NULL, + nxt_router_remove_pid_handler, + NULL, /* NXT_PORT_MSG_READY */ + NULL, /* NXT_PORT_MSG_START_WORKER */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; |