summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c357
1 files changed, 226 insertions, 131 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 39d375f8..7623ccbb 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -27,7 +27,6 @@ typedef struct {
uint32_t spare_processes;
nxt_msec_t timeout;
nxt_msec_t idle_timeout;
- uint32_t requests;
nxt_conf_value_t *limits_value;
nxt_conf_value_t *processes_value;
nxt_conf_value_t *targets_value;
@@ -66,12 +65,14 @@ typedef struct {
typedef struct {
nxt_app_t *app;
nxt_router_temp_conf_t *temp_conf;
+ uint8_t proto; /* 1 bit */
} nxt_app_rpc_t;
typedef struct {
nxt_app_joint_t *app_joint;
uint32_t generation;
+ uint8_t proto; /* 1 bit */
} nxt_app_joint_rpc_t;
@@ -228,8 +229,8 @@ static void nxt_router_app_port_error(nxt_task_t *task,
static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
-static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
- nxt_apr_action_t action);
+static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
+ nxt_port_t *port, nxt_apr_action_t action);
static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
nxt_request_rpc_data_t *req_rpc_data);
static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
@@ -393,32 +394,52 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
{
size_t size;
uint32_t stream;
- nxt_mp_t *mp;
nxt_int_t ret;
nxt_app_t *app;
nxt_buf_t *b;
- nxt_port_t *main_port;
+ nxt_port_t *dport;
nxt_runtime_t *rt;
nxt_app_joint_rpc_t *app_joint_rpc;
app = data;
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ nxt_thread_mutex_lock(&app->mutex);
- nxt_debug(task, "app '%V' %p start process", &app->name, app);
+ dport = app->proto_port;
- size = app->name.length + 1 + app->conf.length;
+ nxt_thread_mutex_unlock(&app->mutex);
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
+ if (dport != NULL) {
+ nxt_debug(task, "app '%V' %p start process", &app->name, app);
- if (nxt_slow_path(b == NULL)) {
- goto failed;
- }
+ b = NULL;
- nxt_buf_cpystr(b, &app->name);
- *b->mem.free++ = '\0';
- nxt_buf_cpystr(b, &app->conf);
+ } else {
+ if (app->proto_port_requests > 0) {
+ nxt_debug(task, "app '%V' %p wait for prototype process",
+ &app->name, app);
+
+ app->proto_port_requests++;
+
+ goto skip;
+ }
+
+ nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
+
+ rt = task->thread->runtime;
+ dport = rt->port_by_type[NXT_PROCESS_MAIN];
+
+ size = app->name.length + 1 + app->conf.length;
+
+ b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ goto failed;
+ }
+
+ nxt_buf_cpystr(b, &app->name);
+ *b->mem.free++ = '\0';
+ nxt_buf_cpystr(b, &app->conf);
+ }
app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
nxt_router_app_port_ready,
@@ -430,7 +451,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
stream = nxt_port_rpc_ex_stream(app_joint_rpc);
- ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
+ ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
-1, stream, port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
@@ -440,26 +461,23 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
app_joint_rpc->app_joint = app->joint;
app_joint_rpc->generation = app->generation;
+ app_joint_rpc->proto = (b != NULL);
- nxt_router_app_joint_use(task, app->joint, 1);
+ if (b != NULL) {
+ app->proto_port_requests++;
- nxt_router_app_use(task, app, -1);
+ b = NULL;
+ }
- return;
+ nxt_router_app_joint_use(task, app->joint, 1);
failed:
if (b != NULL) {
- mp = b->data;
- nxt_mp_free(mp, b);
- nxt_mp_release(mp);
+ nxt_mp_free(b->data, b);
}
- nxt_thread_mutex_lock(&app->mutex);
-
- app->pending_processes--;
-
- nxt_thread_mutex_unlock(&app->mutex);
+skip:
nxt_router_app_use(task, app, -1);
}
@@ -583,14 +601,15 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_router_msg_cancel(task, req_rpc_data);
+ app = req_rpc_data->app;
+
if (req_rpc_data->app_port != NULL) {
- nxt_router_app_port_release(task, req_rpc_data->app_port,
+ nxt_router_app_port_release(task, app, req_rpc_data->app_port,
req_rpc_data->apr_action);
req_rpc_data->app_port = NULL;
}
- app = req_rpc_data->app;
r = req_rpc_data->request;
if (r != NULL) {
@@ -658,6 +677,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_router_greet_controller(task, msg->u.new_port);
}
+ if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) {
+ nxt_port_rpc_handler(task, msg);
+
+ return;
+ }
+
if (port == NULL || port->type != NXT_PROCESS_APP) {
if (msg->port_msg.stream == 0) {
@@ -683,6 +708,8 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
+ nxt_debug(task, "new port id %d (%d)", port->id, port->type);
+
/*
* Port with "id == 0" is application 'main' port and it always
* should come with non-zero stream.
@@ -819,7 +846,8 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_app_t *app;
nxt_int_t ret;
nxt_str_t app_name;
- nxt_port_t *port, *reply_port, *shared_port, *old_shared_port;
+ nxt_port_t *reply_port, *shared_port, *old_shared_port;
+ nxt_port_t *proto_port;
nxt_port_msg_type_t reply;
reply_port = nxt_runtime_port_find(task->thread->runtime,
@@ -862,12 +890,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_thread_mutex_lock(&app->mutex);
- nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
+ proto_port = app->proto_port;
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
- 0, 0, NULL);
+ if (proto_port != NULL) {
+ nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
+ proto_port->pid);
- } nxt_queue_loop;
+ app->proto_port = NULL;
+ proto_port->app = NULL;
+ }
app->generation++;
@@ -883,6 +914,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_close(task, old_shared_port);
nxt_port_use(task, old_shared_port, -1);
+ if (proto_port != NULL) {
+ (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
+ -1, 0, 0, NULL);
+
+ nxt_port_close(task, proto_port);
+
+ nxt_port_use(task, proto_port, -1);
+ }
+
reply = NXT_PORT_MSG_RPC_READY_LAST;
} else {
@@ -1292,12 +1332,6 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
NXT_CONF_MAP_MSEC,
offsetof(nxt_router_app_conf_t, timeout),
},
-
- {
- nxt_string("requests"),
- NXT_CONF_MAP_INT32,
- offsetof(nxt_router_app_conf_t, requests),
- },
};
@@ -1566,7 +1600,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.spare_processes = 0;
apcf.timeout = 0;
apcf.idle_timeout = 15000;
- apcf.requests = 0;
apcf.limits_value = NULL;
apcf.processes_value = NULL;
apcf.targets_value = NULL;
@@ -1646,7 +1679,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application processes: %D", apcf.processes);
nxt_debug(task, "application request timeout: %M", apcf.timeout);
- nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
@@ -1677,7 +1709,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
? apcf.spare_processes : 1;
app->timeout = apcf.timeout;
app->idle_timeout = apcf.idle_timeout;
- app->max_requests = apcf.requests;
app->targets = targets;
@@ -2744,54 +2775,67 @@ nxt_router_app_rpc_create(nxt_task_t *task,
uint32_t stream;
nxt_int_t ret;
nxt_buf_t *b;
- nxt_port_t *main_port, *router_port;
+ nxt_port_t *router_port, *dport;
nxt_runtime_t *rt;
nxt_app_rpc_t *rpc;
- rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
- if (rpc == NULL) {
- goto fail;
- }
+ rt = task->thread->runtime;
- rpc->app = app;
- rpc->temp_conf = tmcf;
+ dport = app->proto_port;
- nxt_debug(task, "app '%V' prefork", &app->name);
+ if (dport == NULL) {
+ nxt_debug(task, "app '%V' prototype prefork", &app->name);
- size = app->name.length + 1 + app->conf.length;
+ size = app->name.length + 1 + app->conf.length;
- b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
- if (nxt_slow_path(b == NULL)) {
- goto fail;
- }
+ b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ goto fail;
+ }
- b->completion_handler = nxt_buf_dummy_completion;
+ b->completion_handler = nxt_buf_dummy_completion;
- nxt_buf_cpystr(b, &app->name);
- *b->mem.free++ = '\0';
- nxt_buf_cpystr(b, &app->conf);
+ nxt_buf_cpystr(b, &app->name);
+ *b->mem.free++ = '\0';
+ nxt_buf_cpystr(b, &app->conf);
+
+ dport = rt->port_by_type[NXT_PROCESS_MAIN];
+
+ } else {
+ nxt_debug(task, "app '%V' prefork", &app->name);
+
+ b = NULL;
+ }
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
- stream = nxt_port_rpc_register_handler(task, router_port,
+ rpc = nxt_port_rpc_register_handler_ex(task, router_port,
nxt_router_app_prefork_ready,
nxt_router_app_prefork_error,
- -1, rpc);
- if (nxt_slow_path(stream == 0)) {
+ sizeof(nxt_app_rpc_t));
+ if (nxt_slow_path(rpc == NULL)) {
goto fail;
}
- ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
- -1, stream, router_port->id, b);
+ rpc->app = app;
+ rpc->temp_conf = tmcf;
+ rpc->proto = (b != NULL);
+
+ stream = nxt_port_rpc_ex_stream(rpc);
+ ret = nxt_port_socket_write(task, dport,
+ NXT_PORT_MSG_START_PROCESS,
+ -1, stream, router_port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, router_port, stream);
goto fail;
}
- app->pending_processes++;
+ if (b == NULL) {
+ nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
+
+ app->pending_processes++;
+ }
return;
@@ -2816,9 +2860,24 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
port = msg->u.new_port;
nxt_assert(port != NULL);
- nxt_assert(port->type == NXT_PROCESS_APP);
nxt_assert(port->id == 0);
+ if (rpc->proto) {
+ nxt_assert(app->proto_port == NULL);
+ nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
+
+ nxt_port_inc_use(port);
+
+ app->proto_port = port;
+ port->app = app;
+
+ nxt_router_app_rpc_create(task, rpc->temp_conf, app);
+
+ return;
+ }
+
+ nxt_assert(port->type == NXT_PROCESS_APP);
+
port->app = app;
port->main_app_port = port;
@@ -2860,10 +2919,16 @@ nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app = rpc->app;
tmcf = rpc->temp_conf;
- nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
- &app->name);
+ if (rpc->proto) {
+ nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
+ &app->name);
- app->pending_processes--;
+ } else {
+ nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
+ &app->name);
+
+ app->pending_processes--;
+ }
nxt_router_conf_error(task, tmcf);
}
@@ -4211,7 +4276,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_thread_mutex_unlock(&app->mutex);
- nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
+ nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
req_rpc_data->apr_action = NXT_APR_CLOSE;
nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
@@ -4422,8 +4487,9 @@ static void
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
+ uint32_t n;
nxt_app_t *app;
- nxt_bool_t start_process;
+ nxt_bool_t start_process, restarted;
nxt_port_t *port;
nxt_app_joint_t *app_joint;
nxt_app_joint_rpc_t *app_joint_rpc;
@@ -4436,7 +4502,6 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_assert(app_joint != NULL);
nxt_assert(port != NULL);
- nxt_assert(port->type == NXT_PROCESS_APP);
nxt_assert(port->id == 0);
app = app_joint->app;
@@ -4453,11 +4518,51 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_thread_mutex_lock(&app->mutex);
+ restarted = (app->generation != app_joint_rpc->generation);
+
+ if (app_joint_rpc->proto) {
+ nxt_assert(app->proto_port == NULL);
+ nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
+
+ n = app->proto_port_requests;
+ app->proto_port_requests = 0;
+
+ if (nxt_slow_path(restarted)) {
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_debug(task, "proto port ready for restarted app, send QUIT");
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
+ NULL);
+
+ } else {
+ port->app = app;
+ app->proto_port = port;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_port_use(task, port, 1);
+ }
+
+ port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
+
+ while (n > 0) {
+ nxt_router_app_use(task, app, 1);
+
+ nxt_router_start_app_process_handler(task, port, app);
+
+ n--;
+ }
+
+ return;
+ }
+
+ nxt_assert(port->type == NXT_PROCESS_APP);
nxt_assert(app->pending_processes != 0);
app->pending_processes--;
- if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
+ if (nxt_slow_path(restarted)) {
nxt_debug(task, "new port ready for restarted app, send QUIT");
start_process = !task->thread->engine->shutdown
@@ -4493,7 +4598,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_app_shared_port_send(task, port);
- nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
+ nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
}
@@ -4600,7 +4705,6 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
-
nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
{
@@ -4670,19 +4774,15 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
static void
-nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
+nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
nxt_apr_action_t action)
{
int inc_use;
uint32_t got_response, dec_requests;
- nxt_app_t *app;
- nxt_bool_t port_unchained, send_quit, adjust_idle_timer;
+ nxt_bool_t adjust_idle_timer;
nxt_port_t *main_app_port;
nxt_assert(port != NULL);
- nxt_assert(port->app != NULL);
-
- app = port->app;
inc_use = 0;
got_response = 0;
@@ -4725,40 +4825,18 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_thread_mutex_lock(&app->mutex);
- main_app_port->app_responses += got_response;
main_app_port->active_requests -= got_response + dec_requests;
app->active_requests -= got_response + dec_requests;
- if (main_app_port->pair[1] != -1
- && (app->max_requests == 0
- || main_app_port->app_responses < app->max_requests))
- {
- if (main_app_port->app_link.next == NULL) {
- nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
+ if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
+ nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
- nxt_port_inc_use(main_app_port);
- }
- }
-
- send_quit = (app->max_requests > 0
- && main_app_port->app_responses >= app->max_requests);
-
- if (send_quit) {
- port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
-
- nxt_port_hash_remove(&app->port_hash, main_app_port);
- app->port_hash_count--;
-
- main_app_port->app = NULL;
- app->processes--;
-
- } else {
- port_unchained = 0;
+ nxt_port_inc_use(main_app_port);
}
adjust_idle_timer = 0;
- if (main_app_port->pair[1] != -1 && !send_quit
+ if (main_app_port->pair[1] != -1
&& main_app_port->active_requests == 0
&& main_app_port->active_websockets == 0
&& main_app_port->idle_link.next == NULL)
@@ -4803,19 +4881,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
goto adjust_use;
}
- if (send_quit) {
- nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
-
- nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
- NULL);
-
- if (port_unchained) {
- nxt_port_use(task, main_app_port, -1);
- }
-
- goto adjust_use;
- }
-
nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
&app->name, app);
@@ -4839,6 +4904,20 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_thread_mutex_lock(&app->mutex);
+ if (port == app->proto_port) {
+ app->proto_port = NULL;
+ port->app = NULL;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
+ port->pid);
+
+ nxt_port_use(task, port, -1);
+
+ return;
+ }
+
nxt_port_hash_remove(&app->port_hash, port);
app->port_hash_count--;
@@ -5027,7 +5106,7 @@ static void
nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
- nxt_port_t *port;
+ nxt_port_t *port, *proto_port;
nxt_app_joint_t *app_joint;
app_joint = obj;
@@ -5039,10 +5118,6 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
break;
}
- nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
-
nxt_port_use(task, port, -1);
}
@@ -5063,8 +5138,28 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
nxt_port_use(task, port, -1);
}
+ proto_port = app->proto_port;
+
+ if (proto_port != NULL) {
+ nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
+ proto_port->pid);
+
+ app->proto_port = NULL;
+ proto_port->app = NULL;
+ }
+
nxt_thread_mutex_unlock(&app->mutex);
+ if (proto_port != NULL) {
+ nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
+ -1, 0, 0, NULL);
+
+ nxt_port_close(task, proto_port);
+
+ nxt_port_use(task, proto_port, -1);
+ }
+
+ nxt_assert(app->proto_port == NULL);
nxt_assert(app->processes == 0);
nxt_assert(app->active_requests == 0);
nxt_assert(app->port_hash_count == 0);
@@ -5501,8 +5596,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
*p++ = '\0';
}
- req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
- if (r->args != NULL && r->args->start != NULL) {
+ req->query_length = (uint32_t) r->args->length;
+ if (r->args->start != NULL) {
query_pos = nxt_pointer_to(target_pos,
r->args->start - r->target.start);