diff options
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 357 |
1 files changed, 226 insertions, 131 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 39d375f8..7623ccbb 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -27,7 +27,6 @@ typedef struct { uint32_t spare_processes; nxt_msec_t timeout; nxt_msec_t idle_timeout; - uint32_t requests; nxt_conf_value_t *limits_value; nxt_conf_value_t *processes_value; nxt_conf_value_t *targets_value; @@ -66,12 +65,14 @@ typedef struct { typedef struct { nxt_app_t *app; nxt_router_temp_conf_t *temp_conf; + uint8_t proto; /* 1 bit */ } nxt_app_rpc_t; typedef struct { nxt_app_joint_t *app_joint; uint32_t generation; + uint8_t proto; /* 1 bit */ } nxt_app_joint_rpc_t; @@ -228,8 +229,8 @@ static void nxt_router_app_port_error(nxt_task_t *task, static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); -static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, - nxt_apr_action_t action); +static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, + nxt_port_t *port, nxt_apr_action_t action); static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data); static void nxt_router_http_request_error(nxt_task_t *task, void *obj, @@ -393,32 +394,52 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; - nxt_mp_t *mp; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; - nxt_port_t *main_port; + nxt_port_t *dport; nxt_runtime_t *rt; nxt_app_joint_rpc_t *app_joint_rpc; app = data; - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + nxt_thread_mutex_lock(&app->mutex); - nxt_debug(task, "app '%V' %p start process", &app->name, app); + dport = app->proto_port; - size = app->name.length + 1 + app->conf.length; + nxt_thread_mutex_unlock(&app->mutex); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); + if (dport != NULL) { + nxt_debug(task, "app '%V' %p start process", &app->name, app); - if (nxt_slow_path(b == NULL)) { - goto failed; - } + b = NULL; - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); + } else { + if (app->proto_port_requests > 0) { + nxt_debug(task, "app '%V' %p wait for prototype process", + &app->name, app); + + app->proto_port_requests++; + + goto skip; + } + + nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); + + rt = task->thread->runtime; + dport = rt->port_by_type[NXT_PROCESS_MAIN]; + + size = app->name.length + 1 + app->conf.length; + + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto failed; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, nxt_router_app_port_ready, @@ -430,7 +451,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, + ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, -1, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -440,26 +461,23 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, app_joint_rpc->app_joint = app->joint; app_joint_rpc->generation = app->generation; + app_joint_rpc->proto = (b != NULL); - nxt_router_app_joint_use(task, app->joint, 1); + if (b != NULL) { + app->proto_port_requests++; - nxt_router_app_use(task, app, -1); + b = NULL; + } - return; + nxt_router_app_joint_use(task, app->joint, 1); failed: if (b != NULL) { - mp = b->data; - nxt_mp_free(mp, b); - nxt_mp_release(mp); + nxt_mp_free(b->data, b); } - nxt_thread_mutex_lock(&app->mutex); - - app->pending_processes--; - - nxt_thread_mutex_unlock(&app->mutex); +skip: nxt_router_app_use(task, app, -1); } @@ -583,14 +601,15 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_router_msg_cancel(task, req_rpc_data); + app = req_rpc_data->app; + if (req_rpc_data->app_port != NULL) { - nxt_router_app_port_release(task, req_rpc_data->app_port, + nxt_router_app_port_release(task, app, req_rpc_data->app_port, req_rpc_data->apr_action); req_rpc_data->app_port = NULL; } - app = req_rpc_data->app; r = req_rpc_data->request; if (r != NULL) { @@ -658,6 +677,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_router_greet_controller(task, msg->u.new_port); } + if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { + nxt_port_rpc_handler(task, msg); + + return; + } + if (port == NULL || port->type != NXT_PROCESS_APP) { if (msg->port_msg.stream == 0) { @@ -683,6 +708,8 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + nxt_debug(task, "new port id %d (%d)", port->id, port->type); + /* * Port with "id == 0" is application 'main' port and it always * should come with non-zero stream. @@ -819,7 +846,8 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_app_t *app; nxt_int_t ret; nxt_str_t app_name; - nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; + nxt_port_t *reply_port, *shared_port, *old_shared_port; + nxt_port_t *proto_port; nxt_port_msg_type_t reply; reply_port = nxt_runtime_port_find(task->thread->runtime, @@ -862,12 +890,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_thread_mutex_lock(&app->mutex); - nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { + proto_port = app->proto_port; - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, - 0, 0, NULL); + if (proto_port != NULL) { + nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, + proto_port->pid); - } nxt_queue_loop; + app->proto_port = NULL; + proto_port->app = NULL; + } app->generation++; @@ -883,6 +914,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_close(task, old_shared_port); nxt_port_use(task, old_shared_port, -1); + if (proto_port != NULL) { + (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + nxt_port_close(task, proto_port); + + nxt_port_use(task, proto_port, -1); + } + reply = NXT_PORT_MSG_RPC_READY_LAST; } else { @@ -1292,12 +1332,6 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { NXT_CONF_MAP_MSEC, offsetof(nxt_router_app_conf_t, timeout), }, - - { - nxt_string("requests"), - NXT_CONF_MAP_INT32, - offsetof(nxt_router_app_conf_t, requests), - }, }; @@ -1566,7 +1600,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.spare_processes = 0; apcf.timeout = 0; apcf.idle_timeout = 15000; - apcf.requests = 0; apcf.limits_value = NULL; apcf.processes_value = NULL; apcf.targets_value = NULL; @@ -1646,7 +1679,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application processes: %D", apcf.processes); nxt_debug(task, "application request timeout: %M", apcf.timeout); - nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1677,7 +1709,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, ? apcf.spare_processes : 1; app->timeout = apcf.timeout; app->idle_timeout = apcf.idle_timeout; - app->max_requests = apcf.requests; app->targets = targets; @@ -2744,54 +2775,67 @@ nxt_router_app_rpc_create(nxt_task_t *task, uint32_t stream; nxt_int_t ret; nxt_buf_t *b; - nxt_port_t *main_port, *router_port; + nxt_port_t *router_port, *dport; nxt_runtime_t *rt; nxt_app_rpc_t *rpc; - rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); - if (rpc == NULL) { - goto fail; - } + rt = task->thread->runtime; - rpc->app = app; - rpc->temp_conf = tmcf; + dport = app->proto_port; - nxt_debug(task, "app '%V' prefork", &app->name); + if (dport == NULL) { + nxt_debug(task, "app '%V' prototype prefork", &app->name); - size = app->name.length + 1 + app->conf.length; + size = app->name.length + 1 + app->conf.length; - b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - goto fail; - } + b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto fail; + } - b->completion_handler = nxt_buf_dummy_completion; + b->completion_handler = nxt_buf_dummy_completion; - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + + dport = rt->port_by_type[NXT_PROCESS_MAIN]; + + } else { + nxt_debug(task, "app '%V' prefork", &app->name); + + b = NULL; + } - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - stream = nxt_port_rpc_register_handler(task, router_port, + rpc = nxt_port_rpc_register_handler_ex(task, router_port, nxt_router_app_prefork_ready, nxt_router_app_prefork_error, - -1, rpc); - if (nxt_slow_path(stream == 0)) { + sizeof(nxt_app_rpc_t)); + if (nxt_slow_path(rpc == NULL)) { goto fail; } - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + rpc->app = app; + rpc->temp_conf = tmcf; + rpc->proto = (b != NULL); + + stream = nxt_port_rpc_ex_stream(rpc); + ret = nxt_port_socket_write(task, dport, + NXT_PORT_MSG_START_PROCESS, + -1, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; } - app->pending_processes++; + if (b == NULL) { + nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); + + app->pending_processes++; + } return; @@ -2816,9 +2860,24 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, port = msg->u.new_port; nxt_assert(port != NULL); - nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(port->id == 0); + if (rpc->proto) { + nxt_assert(app->proto_port == NULL); + nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); + + nxt_port_inc_use(port); + + app->proto_port = port; + port->app = app; + + nxt_router_app_rpc_create(task, rpc->temp_conf, app); + + return; + } + + nxt_assert(port->type == NXT_PROCESS_APP); + port->app = app; port->main_app_port = port; @@ -2860,10 +2919,16 @@ nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, app = rpc->app; tmcf = rpc->temp_conf; - nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", - &app->name); + if (rpc->proto) { + nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", + &app->name); - app->pending_processes--; + } else { + nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", + &app->name); + + app->pending_processes--; + } nxt_router_conf_error(task, tmcf); } @@ -4211,7 +4276,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_unlock(&app->mutex); - nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); + nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE); req_rpc_data->apr_action = NXT_APR_CLOSE; nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); @@ -4422,8 +4487,9 @@ static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + uint32_t n; nxt_app_t *app; - nxt_bool_t start_process; + nxt_bool_t start_process, restarted; nxt_port_t *port; nxt_app_joint_t *app_joint; nxt_app_joint_rpc_t *app_joint_rpc; @@ -4436,7 +4502,6 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_assert(app_joint != NULL); nxt_assert(port != NULL); - nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(port->id == 0); app = app_joint->app; @@ -4453,11 +4518,51 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_lock(&app->mutex); + restarted = (app->generation != app_joint_rpc->generation); + + if (app_joint_rpc->proto) { + nxt_assert(app->proto_port == NULL); + nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); + + n = app->proto_port_requests; + app->proto_port_requests = 0; + + if (nxt_slow_path(restarted)) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "proto port ready for restarted app, send QUIT"); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); + + } else { + port->app = app; + app->proto_port = port; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_port_use(task, port, 1); + } + + port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER]; + + while (n > 0) { + nxt_router_app_use(task, app, 1); + + nxt_router_start_app_process_handler(task, port, app); + + n--; + } + + return; + } + + nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(app->pending_processes != 0); app->pending_processes--; - if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { + if (nxt_slow_path(restarted)) { nxt_debug(task, "new port ready for restarted app, send QUIT"); start_process = !task->thread->engine->shutdown @@ -4493,7 +4598,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_app_shared_port_send(task, port); - nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); + nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); } @@ -4600,7 +4705,6 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, } - nxt_inline nxt_port_t * nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) { @@ -4670,19 +4774,15 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) static void -nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, +nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, nxt_apr_action_t action) { int inc_use; uint32_t got_response, dec_requests; - nxt_app_t *app; - nxt_bool_t port_unchained, send_quit, adjust_idle_timer; + nxt_bool_t adjust_idle_timer; nxt_port_t *main_app_port; nxt_assert(port != NULL); - nxt_assert(port->app != NULL); - - app = port->app; inc_use = 0; got_response = 0; @@ -4725,40 +4825,18 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_thread_mutex_lock(&app->mutex); - main_app_port->app_responses += got_response; main_app_port->active_requests -= got_response + dec_requests; app->active_requests -= got_response + dec_requests; - if (main_app_port->pair[1] != -1 - && (app->max_requests == 0 - || main_app_port->app_responses < app->max_requests)) - { - if (main_app_port->app_link.next == NULL) { - nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); + if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { + nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - nxt_port_inc_use(main_app_port); - } - } - - send_quit = (app->max_requests > 0 - && main_app_port->app_responses >= app->max_requests); - - if (send_quit) { - port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); - - nxt_port_hash_remove(&app->port_hash, main_app_port); - app->port_hash_count--; - - main_app_port->app = NULL; - app->processes--; - - } else { - port_unchained = 0; + nxt_port_inc_use(main_app_port); } adjust_idle_timer = 0; - if (main_app_port->pair[1] != -1 && !send_quit + if (main_app_port->pair[1] != -1 && main_app_port->active_requests == 0 && main_app_port->active_websockets == 0 && main_app_port->idle_link.next == NULL) @@ -4803,19 +4881,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, goto adjust_use; } - if (send_quit) { - nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); - - nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, - NULL); - - if (port_unchained) { - nxt_port_use(task, main_app_port, -1); - } - - goto adjust_use; - } - nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", &app->name, app); @@ -4839,6 +4904,20 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); + if (port == app->proto_port) { + app->proto_port = NULL; + port->app = NULL; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name, + port->pid); + + nxt_port_use(task, port, -1); + + return; + } + nxt_port_hash_remove(&app->port_hash, port); app->port_hash_count--; @@ -5027,7 +5106,7 @@ static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data) { nxt_app_t *app; - nxt_port_t *port; + nxt_port_t *port, *proto_port; nxt_app_joint_t *app_joint; app_joint = obj; @@ -5039,10 +5118,6 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) break; } - nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - nxt_port_use(task, port, -1); } @@ -5063,8 +5138,28 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) nxt_port_use(task, port, -1); } + proto_port = app->proto_port; + + if (proto_port != NULL) { + nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, + proto_port->pid); + + app->proto_port = NULL; + proto_port->app = NULL; + } + nxt_thread_mutex_unlock(&app->mutex); + if (proto_port != NULL) { + nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + nxt_port_close(task, proto_port); + + nxt_port_use(task, proto_port, -1); + } + + nxt_assert(app->proto_port == NULL); nxt_assert(app->processes == 0); nxt_assert(app->active_requests == 0); nxt_assert(app->port_hash_count == 0); @@ -5501,8 +5596,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, *p++ = '\0'; } - req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; - if (r->args != NULL && r->args->start != NULL) { + req->query_length = (uint32_t) r->args->length; + if (r->args->start != NULL) { query_pos = nxt_pointer_to(target_pos, r->args->start - r->target.start); |