diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 121 |
1 files changed, 98 insertions, 23 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 93b750a0..788199c7 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -27,6 +27,7 @@ typedef struct { uint32_t requests; nxt_conf_value_t *limits_value; nxt_conf_value_t *processes_value; + nxt_conf_value_t *targets_value; } nxt_router_app_conf_t; @@ -74,6 +75,9 @@ struct nxt_port_select_state_s { typedef struct nxt_port_select_state_s nxt_port_select_state_t; +static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, + nxt_mp_t *mp); +static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port); @@ -267,8 +271,8 @@ static const nxt_str_t *nxt_app_msg_prefix[] = { }; -nxt_port_handlers_t nxt_router_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, +static const nxt_port_handlers_t nxt_router_process_port_handlers = { + .quit = nxt_signal_quit_handler, .new_port = nxt_router_new_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, @@ -281,8 +285,29 @@ nxt_port_handlers_t nxt_router_process_port_handlers = { }; -nxt_int_t -nxt_router_start(nxt_task_t *task, void *data) +const nxt_process_init_t nxt_router_process = { + .name = "router", + .type = NXT_PROCESS_ROUTER, + .prefork = nxt_router_prefork, + .restart = 1, + .setup = nxt_process_core_setup, + .start = nxt_router_start, + .port_handlers = &nxt_router_process_port_handlers, + .signals = nxt_process_signals, +}; + + +static nxt_int_t +nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) +{ + nxt_runtime_stop_app_processes(task, task->thread->runtime); + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) { nxt_int_t ret; nxt_port_t *controller_port; @@ -291,6 +316,8 @@ nxt_router_start(nxt_task_t *task, void *data) rt = task->thread->runtime; + nxt_log(task, NXT_LOG_INFO, "router started"); + #if (NXT_TLS) rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); if (nxt_slow_path(rt->tls == NULL)) { @@ -381,8 +408,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, goto failed; } - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, - stream, port->id, b); + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, + -1, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -861,7 +888,7 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } if (msg->u.new_port == NULL - || msg->u.new_port->type != NXT_PROCESS_WORKER) + || msg->u.new_port->type != NXT_PROCESS_APP) { msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } @@ -943,8 +970,10 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) { - nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, - msg->u.data); + if (nxt_fast_path(engine->port != NULL)) { + nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, + msg->u.data); + } } nxt_queue_loop; @@ -1272,6 +1301,12 @@ static nxt_conf_map_t nxt_router_app_conf[] = { NXT_CONF_MAP_PTR, offsetof(nxt_router_app_conf_t, processes_value), }, + + { + nxt_string("targets"), + NXT_CONF_MAP_PTR, + offsetof(nxt_router_app_conf_t, targets_value), + }, }; @@ -1423,12 +1458,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, { u_char *p; size_t size; - nxt_mp_t *mp; - uint32_t next; + nxt_mp_t *mp, *app_mp; + uint32_t next, next_target; nxt_int_t ret; - nxt_str_t name, path; + nxt_str_t name, path, target; nxt_app_t *app, *prev; - nxt_str_t *t; + nxt_str_t *t, *s, *targets; + nxt_uint_t n, i; nxt_router_t *router; nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value, *websocket; @@ -1501,13 +1537,20 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, size = nxt_conf_json_length(application, NULL); - app = nxt_malloc(sizeof(nxt_app_t) + name.length + size); - if (app == NULL) { + app_mp = nxt_mp_create(4096, 128, 1024, 64); + if (nxt_slow_path(app_mp == NULL)) { goto fail; } + app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size); + if (app == NULL) { + goto app_fail; + } + nxt_memzero(app, sizeof(nxt_app_t)); + app->mem_pool = app_mp; + app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t)); app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length); @@ -1522,7 +1565,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, prev = nxt_router_app_find(&router->apps, &name); if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { - nxt_free(app); + nxt_mp_destroy(app_mp); nxt_queue_remove(&prev->link); nxt_queue_insert_tail(&tmcf->previous, &prev->link); @@ -1538,6 +1581,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.requests = 0; apcf.limits_value = NULL; apcf.processes_value = NULL; + apcf.targets_value = NULL; app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); if (nxt_slow_path(app_joint == NULL)) { @@ -1587,6 +1631,30 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.spare_processes = apcf.processes; } + if (apcf.targets_value != NULL) { + n = nxt_conf_object_members_count(apcf.targets_value); + + targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n); + if (nxt_slow_path(targets == NULL)) { + goto app_fail; + } + + next_target = 0; + + for (i = 0; i < n; i++) { + (void) nxt_conf_next_object_member(apcf.targets_value, + &target, &next_target); + + s = nxt_str_dup(app_mp, &targets[i], &target); + if (nxt_slow_path(s == NULL)) { + goto app_fail; + } + } + + } else { + targets = NULL; + } + nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application processes: %D", apcf.processes); nxt_debug(task, "application request timeout: %M", apcf.timeout); @@ -1628,6 +1696,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->max_pending_responses = 2; app->max_requests = apcf.requests; + app->targets = targets; + engine = task->thread->engine; app->engine = engine; @@ -1793,6 +1863,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } + ret = nxt_http_routes_resolve(task, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + value = nxt_conf_get_path(conf, &access_log_path); if (value != NULL) { @@ -1827,8 +1902,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf->access_log = access_log; } - nxt_http_routes_resolve(task, tmcf); - nxt_queue_add(&tmcf->deleting, &router->sockets); nxt_queue_init(&router->sockets); @@ -1836,7 +1909,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_fail: - nxt_free(app); + nxt_mp_destroy(app_mp); fail: @@ -1844,7 +1917,7 @@ fail: nxt_queue_remove(&app->link); nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); + nxt_mp_destroy(app->mem_pool); } nxt_queue_loop; @@ -2353,8 +2426,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, goto fail; } - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, - stream, router_port->id, b); + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, + -1, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); @@ -4535,7 +4608,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) nxt_assert(nxt_queue_is_empty(&app->idle_ports)); nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); + nxt_mp_destroy(app->mem_pool); app_joint->app = NULL; @@ -4989,6 +5062,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, req = (nxt_unit_request_t *) out->mem.free; out->mem.free += req_size; + req->app_target = r->app_target; + req->content_length = content_length; p = (u_char *) (req->fields + fields_count); |