/*
* Copyright (C) Igor Sysoev
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_router.h>
#include <nxt_conf.h>
#include <nxt_http.h>
typedef struct {
nxt_str_t type;
uint32_t workers;
nxt_msec_t timeout;
nxt_msec_t res_timeout;
uint32_t requests;
nxt_conf_value_t *limits_value;
} nxt_router_app_conf_t;
typedef struct {
nxt_str_t application;
} nxt_router_listener_conf_t;
typedef struct nxt_msg_info_s {
nxt_buf_t *buf;
nxt_port_mmap_tracking_t tracking;
nxt_work_handler_t completion_handler;
} nxt_msg_info_t;
typedef struct nxt_req_app_link_s nxt_req_app_link_t;
typedef struct {
uint32_t stream;
nxt_app_t *app;
nxt_port_t *app_port;
nxt_app_parse_ctx_t *ap;
nxt_msg_info_t msg_info;
nxt_req_app_link_t *ra;
nxt_queue_link_t link; /* for nxt_conn_t.requests */
} nxt_req_conn_link_t;
struct nxt_req_app_link_s {
uint32_t stream;
nxt_atomic_t use_count;
nxt_port_t *app_port;
nxt_port_t *reply_port;
nxt_app_parse_ctx_t *ap;
nxt_msg_info_t msg_info;
nxt_req_conn_link_t *rc;
nxt_nsec_t res_time;
nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
nxt_mp_t *mem_pool;
nxt_work_t work;
int err_code;
const char *err_str;
};
typedef struct {
nxt_socket_conf_t *socket_conf;
nxt_router_temp_conf_t *temp_conf;
} nxt_socket_rpc_t;
struct nxt_port_select_state_s {
nxt_app_t *app;
nxt_req_app_link_t *ra;
nxt_port_t *failed_port;
int failed_port_use_delta;
nxt_bool_t can_start_worker;
nxt_req_app_link_t *shared_ra;
nxt_port_t *port;
};
typedef struct nxt_port_select_state_s nxt_port_select_state_t;
static void nxt_router_port_select(nxt_task_t *task,
nxt_port_select_state_t *state);
static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
nxt_port_select_state_t *state);
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
nxt_inline void
nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
{
nxt_atomic_fetch_add(&ra->use_count, 1);
}
nxt_inline void
nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
{
int c;
c = nxt_atomic_fetch_add(&ra->use_count, -1);
nxt_assert(c > 1);
}
static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conf_ready(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_conf_error(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_conf_send(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
nxt_str_t *name);
static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
static void nxt_router_listen_socket_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_listen_socket_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
const nxt_event_interface_t *interface);
static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler);
static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine);
static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_engine_post(nxt_event_engine_t *engine,
nxt_work_t *jobs);
static void nxt_router_thread_start(void *data);
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_t *skcf);
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
static void nxt_router_app_port_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
nxt_req_app_link_t *ra);
static void nxt_router_app_prepare_request(nxt_task_t *task,
nxt_req_app_link_t *ra);
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
static const nxt_http_request_state_t nxt_http_request_send_state;
static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
static nxt_router_t *nxt_router;
static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
nxt_python_prepare_msg,
nxt_php_prepare_msg,
nxt_go_prepare_msg,
};
nxt_int_t
nxt_router_start(nxt_task_t *task, void *data)
{
nxt_int_t ret;
nxt_router_t *router;
nxt_runtime_t *rt;
rt = task->thread->runtime;
ret = nxt_http_init(task, rt);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
router = nxt_zalloc(sizeof(nxt_router_t));
if (nxt_slow_path(router == NULL)) {
return NXT_ERROR;
}
nxt_queue_init(&router->engines);
nxt_queue_init(&router->sockets);
nxt_queue_init(&router->apps);
nxt_router = router;
return NXT_OK;
}
static void
nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
{
size_t size;
uint32_t stream;
nxt_mp_t *mp;
nxt_app_t *app;
nxt_buf_t *b;
nxt_port_t *main_port;
nxt_runtime_t *rt;
app = data;
rt = task->thread->runtime;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
nxt_debug(task, "app '%V' %p start worker", &app->name, app);
size = app->name.length + 1 + app->conf.length;
b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
if (nxt_slow_path(b == NULL)) {
goto failed;
}
nxt_buf_cpystr(b, &app->name);
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
stream = nxt_port_rpc_register_handler(task, port,
nxt_router_app_port_ready,
nxt_router_app_port_error,
-1, app);
if (nxt_slow_path(stream == 0)) {
mp = b->data;
nxt_mp_free(mp, b);
nxt_mp_release(mp);
goto failed;
}
nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
stream, port->id, b);
return;
failed:
nxt_thread_mutex_lock(&app->mutex);
app->pending_workers--;
nxt_thread_mutex_unlock(&app->mutex);
nxt_router_app_use(task, app, -1);
}
static nxt_int_t
nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
{
nxt_int_t res;
nxt_port_t *router_port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
nxt_router_app_use(task, app, 1);
res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
app);
if (res == NXT_OK) {
return res;
}
nxt_thread_mutex_lock(&app->mutex);
app->pending_workers--;
nxt_thread_mutex_unlock(&app->mutex);
nxt_router_app_use(task, app, -1);
return NXT_ERROR;
}
nxt_inline void
nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
nxt_req_conn_link_t *rc)
{
nxt_event_engine_t *engine;
engine = task->thread->engine;
nxt_memzero(ra, sizeof(nxt_req_app_link_t));
ra->stream = rc->stream;
ra->use_count = 1;
ra->rc = rc;
rc->ra = ra;
ra->reply_port = engine->port;
ra->ap = rc->ap;
ra->work.handler = NULL;
ra->work.task = &engine->task;
ra->work.obj = ra;
ra->work.data = engine;
}
nxt_inline nxt_req_app_link_t *
nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
{
nxt_mp_t *mp;
nxt_req_app_link_t *ra;
if (ra_src->mem_pool != NULL) {
return ra_src;
}
mp = ra_src->ap->mem_pool;
ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
if (nxt_slow_path(ra == NULL)) {
ra_src->rc->ra = NULL;
ra_src->rc = NULL;
return NULL;
}
nxt_mp_retain(mp);
nxt_router_ra_init(task, ra, ra_src->rc);
ra->mem_pool = mp;
return ra;
}
nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
uint32_t stream)
{
nxt_buf_t *b, *next;
nxt_bool_t cancelled;
if (msg_info->buf == NULL) {
return 0;
}
cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
stream);
if (cancelled) {
nxt_debug(task, "stream #%uD: cancelled by router", stream);
}
for (b = msg_info->buf; b != NULL; b = next) {
next = b->next;
b->completion_handler = msg_info->completion_handler;
if (b->is_port_mmap_sent) {
b->is_port_mmap_sent = cancelled == 0;
b->completion_handler(task, b, b->parent);
}
}
msg_info->buf = NULL;
return cancelled;
}
static void
nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
static void
nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_req_app_link_t *ra;
ra = obj;
nxt_router_ra_update_peer(task, ra);
nxt_router_ra_use(task, ra, -1);
}
static void
nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
{
nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc;
engine = ra->work.data;
if (task->thread->engine != engine) {
nxt_router_ra_inc_use(ra);
ra->work.handler = nxt_router_ra_update_peer_handler;
ra->work.task = &engine->task;
ra->work.next = NULL;
nxt_debug(task, "ra stream #%uD post update peer to %p",
ra->stream, engine);
nxt_event_engine_post(engine, &ra->work);
return;
}
nxt_debug(task, "ra stream #%uD update peer", ra->stream);
rc = ra->rc;
if (rc != NULL && ra->app_port != NULL) {
nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
}
nxt_router_ra_use(task, ra, -1);
}
static void
nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
{
nxt_mp_t *mp;
nxt_req_conn_link_t *rc;
nxt_assert(task->thread->engine == ra->work.data);
nxt_assert(ra->use_count == 0);
nxt_debug(task, "ra stream #%uD release", ra->stream);
rc = ra->rc;
if (rc != NULL) {
if (nxt_slow_path(ra->err_code != 0)) {
nxt_http_request_error(task, rc->ap->request, ra->err_code);
} else {
rc->app_port = ra->app_port;
rc->msg_info = ra->msg_info;
if (rc->app->timeout != 0) {
rc->ap->timer.handler = nxt_router_app_timeout;
nxt_timer_add(task->thread->engine, &rc->ap->timer,
rc->app->timeout);
}
ra->app_port = NULL;
ra->msg_info.buf = NULL;
}
rc->ra = NULL;
ra->rc = NULL;
}
if (ra->app_port != NULL) {
nxt_router_app_port_release(task, ra->app_port, 0, 1);
ra->app_port = NULL;
}
nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
mp = ra->mem_pool;
if (mp != NULL) {
nxt_mp_free(mp, ra);
nxt_mp_release(mp);
}
}
static void
nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_req_app_link_t *ra;
ra = obj;
nxt_assert(ra->work.data == data);
nxt_atomic_fetch_add(&ra->use_count, -1);
nxt_router_ra_release(task, ra);
}
static void
nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
{
int c;
nxt_event_engine_t *engine;
c = nxt_atomic_fetch_add(&ra->use_count, i);
if (i < 0 && c == -i) {
engine = ra->work.data;
if (task->thread->engine == engine) {
nxt_router_ra_release(task, ra);
return;
}
nxt_router_ra_inc_use(ra);
ra->work.handler = nxt_router_ra_release_handler;
ra->work.task = &engine->task;
ra->work.next = NULL;
nxt_debug(task, "ra stream #%uD post release to %p",
ra->stream, engine);
nxt_event_engine_post(engine, &ra->work);
}
}
nxt_inline void
nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
{
ra->app_port = NULL;
ra->err_code = code;
ra->err_str = str;
}
nxt_inline void
nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
{
nxt_queue_insert_tail(&ra->app_port->pending_requests,
&ra->link_port_pending);
nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
nxt_router_ra_inc_use(ra);
ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
}
nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
if (lnk->next != NULL) {
nxt_queue_remove(lnk);
lnk->next = NULL;
return 1;
}
return 0;
}
nxt_inline void
nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
{
int ra_use_delta;
nxt_req_app_link_t *ra;
if (rc->app_port != NULL) {
nxt_router_app_port_release(task, rc->app_port, 0, 1);
rc->app_port = NULL;
}
nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
ra = rc->ra;
if (ra != NULL) {
rc->ra = NULL;
ra->rc = NULL;
ra_use_delta = 0;
nxt_thread_mutex_lock(&rc->app->mutex);
if (ra->link_app_requests.next == NULL
&& ra->link_port_pending.next == NULL
&& ra->link_app_pending.next == NULL)
{
ra = NULL;
} else {
ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
nxt_queue_chk_remove(&ra->link_app_pending);
}
nxt_thread_mutex_unlock(&rc->app->mutex);
if (ra != NULL) {
nxt_router_ra_use(task, ra, ra_use_delta);
}
}
if (rc->app != NULL) {
nxt_router_app_use(task, rc->app, -1);
rc->app = NULL;
}
if (rc->ap != NULL) {
nxt_app_http_req_done(task, rc->ap);
rc->ap = NULL;
}
}
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_new_port_handler(task, msg);
if (msg->port_msg.stream == 0) {
return;
}
if (msg->u.new_port == NULL
|| msg->u.new_port->type != NXT_PROCESS_WORKER)
{
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
}
nxt_port_rpc_handler(task, msg);
}
void
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_int_t ret;
nxt_buf_t *b;
nxt_router_temp_conf_t *tmcf;
tmcf = nxt_router_temp_conf(task);
if (nxt_slow_path(tmcf == NULL)) {
return;
}
nxt_debug(task, "nxt_router_conf_data_handler(%d): %*s",
nxt_buf_used_size(msg->buf),
nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size);
nxt_assert(b != NULL);
tmcf->conf->router = nxt_router;
tmcf->stream = msg->port_msg.stream;
tmcf->port = nxt_runtime_port_find(task->thread->runtime,
msg->port_msg.pid,
msg->port_msg.reply_port);
ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
if (nxt_fast_path(ret == NXT_OK)) {
nxt_router_conf_apply(task, tmcf, NULL);
} else {
nxt_router_conf_error(task, tmcf);
}
}
static void
nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
{
union {
nxt_pid_t removed_pid;
void *data;
} u;
u.data = data;
nxt_port_rpc_remove_peer(task, port, u.removed_pid);
}
void
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_event_engine_t *engine;
nxt_port_remove_pid_handler(task, msg);
if (msg->port_msg.stream == 0) {
return;
}
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
{
nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
msg->u.data);
}
nxt_queue_loop;
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
nxt_port_rpc_handler(task, msg);
}
static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t *task)
{
nxt_mp_t *mp, *tmp;
nxt_router_conf_t *rtcf;
nxt_router_temp_conf_t *tmcf;
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
return NULL;
}
rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
if (nxt_slow_path(rtcf == NULL)) {
goto fail;
}
rtcf->mem_pool = mp;
tmp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(tmp == NULL)) {
goto fail;
}
tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
if (nxt_slow_path(tmcf == NULL)) {
goto temp_fail;
}
tmcf->mem_pool = tmp;
tmcf->conf = rtcf;
tmcf->count = 1;
tmcf->engine = task->thread->engine;
tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
sizeof(nxt_router_engine_conf_t));
if (nxt_slow_path(tmcf->engines == NULL)) {
goto temp_fail;
}
nxt_queue_init(&tmcf->deleting);
nxt_queue_init(&tmcf->keeping);
nxt_queue_init(&tmcf->updating);
nxt_queue_init(&tmcf->pending);
nxt_queue_init(&tmcf->creating);
nxt_queue_init(&tmcf->apps);
nxt_queue_init(&tmcf->previous);
return tmcf;
temp_fail:
nxt_mp_destroy(tmp);
fail:
nxt_mp_destroy(mp);
return NULL;
}
static void
nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
{
nxt_int_t ret;
nxt_router_t *router;
nxt_runtime_t *rt;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
nxt_router_temp_conf_t *tmcf;
const nxt_event_interface_t *interface;
tmcf = obj;
qlk = nxt_queue_first(&tmcf->pending);
if (qlk != nxt_queue_tail(&tmcf->pending)) {
nxt_queue_remove(qlk);
nxt_queue_insert_tail(&tmcf->creating, qlk);
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
return;
}
rt = task->thread->runtime;
interface = nxt_service_get(rt->services, "engine", NULL);
router = tmcf->conf->router;
ret = nxt_router_engines_create(task, router, tmcf, interface);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
ret = nxt_router_threads_create(task, rt, tmcf);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
nxt_router_apps_sort(task, router, tmcf);
nxt_router_engines_post(router, tmcf);
nxt_queue_add(&router->sockets, &tmcf->updating);
nxt_queue_add(&router->sockets, &tmcf->creating);
nxt_router_conf_ready(task, tmcf);
return;
fail:
nxt_router_conf_error(task, tmcf);
return;
}
static void
nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
job = obj;
nxt_router_conf_ready(task, job->tmcf);
}
static void
nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
{
nxt_debug(task, "temp conf count:%D", tmcf->count);
if (--tmcf->count == 0) {
nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
}
}
static void
nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
{
nxt_socket_t s;
nxt_router_t *router;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
for (qlk = nxt_queue_first(&tmcf->creating);
qlk != nxt_queue_tail(&tmcf->creating);
qlk = nxt_queue_next(qlk))
{
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
s = skcf->listen->socket;
if (s != -1) {
nxt_socket_close(task, s);
}
nxt_free(skcf->listen);
}
router = tmcf->conf->router;
nxt_queue_add(&router->sockets, &tmcf->keeping);
nxt_queue_add(&router->sockets, &tmcf->deleting);
nxt_queue_add(&router->apps, &tmcf->previous);
// TODO: new engines and threads
nxt_mp_destroy(tmcf->conf->mem_pool);
nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
}
static void
nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_port_msg_type_t type)
{
nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
}
static nxt_conf_map_t nxt_router_conf[] = {
{
nxt_string("listeners_threads"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_conf_t, threads),
},
};
static nxt_conf_map_t nxt_router_app_conf[] = {
{
nxt_string("type"),
NXT_CONF_MAP_STR,
offsetof(nxt_router_app_conf_t, type),
},
{
nxt_string("workers"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_app_conf_t, workers),
},
{
nxt_string("limits"),
NXT_CONF_MAP_PTR,
offsetof(nxt_router_app_conf_t, limits_value),
},
};
static nxt_conf_map_t nxt_router_app_limits_conf[] = {
{
nxt_string("timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_router_app_conf_t, timeout),
},
{
nxt_string("reschedule_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_router_app_conf_t, res_timeout),
},
{
nxt_string("requests"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_app_conf_t, requests),
},
};
static nxt_conf_map_t nxt_router_listener_conf[] = {
{
nxt_string("application"),
NXT_CONF_MAP_STR,
offsetof(nxt_router_listener_conf_t, application),
},
};
static nxt_conf_map_t nxt_router_http_conf[] = {
{
nxt_string("header_buffer_size"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, header_buffer_size),
},
{
nxt_string("large_header_buffer_size"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, large_header_buffer_size),
},
{
nxt_string("large_header_buffers"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, large_header_buffers),
},
{
nxt_string("body_buffer_size"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, body_buffer_size),
},
{
nxt_string("max_body_size"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, max_body_size),
},
{
nxt_string("idle_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, idle_timeout),
},
{
nxt_string("header_read_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, header_read_timeout),
},
{
nxt_string("body_read_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, body_read_timeout),
},
{
nxt_string("send_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, send_timeout),
},
};
static nxt_int_t
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, u_char *end)
{
u_char *p;
size_t size;
nxt_mp_t *mp;
uint32_t next;
nxt_int_t ret;
nxt_str_t name;
nxt_app_t *app, *prev;
nxt_router_t *router;
nxt_conf_value_t *conf, *http;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_socket_conf_t *skcf;
nxt_app_lang_module_t *lang;
nxt_router_app_conf_t apcf;
nxt_router_listener_conf_t lscf;
static nxt_str_t http_path = nxt_string("/http");
static nxt_str_t applications_path = nxt_string("/applications");
static nxt_str_t listeners_path = nxt_string("/listeners");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
if (conf == NULL) {
nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
return NXT_ERROR;
}
mp = tmcf->conf->mem_pool;
ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
nxt_nitems(nxt_router_conf), tmcf->conf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "root map error");
return NXT_ERROR;
}
if (tmcf->conf->threads == 0) {
tmcf->conf->threads = nxt_ncpu;
}
applications = nxt_conf_get_path(conf, &applications_path);
if (applications == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
return NXT_ERROR;
}
router = tmcf->conf->router;
next = 0;
for ( ;; ) {
application = nxt_conf_next_object_member(applications, &name, &next);
if (application == NULL) {
break;
}
nxt_debug(task, "application \"%V\"", &name);
size = nxt_conf_json_length(application, NULL);
app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
if (app == NULL) {
goto fail;
}
nxt_memzero(app, sizeof(nxt_app_t));
app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
p = nxt_conf_json_print(app->conf.start, application, NULL);
app->conf.length = p - app->conf.start;
nxt_assert(app->conf.length <= size);
nxt_debug(task, "application conf \"%V\"", &app->conf);
prev = nxt_router_app_find(&router->apps, &name);
if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
nxt_free(app);
nxt_queue_remove(&prev->link);
nxt_queue_insert_tail(&tmcf->previous, &prev->link);
continue;
}
apcf.workers = 1;
apcf.timeout = 0;
apcf.res_timeout = 1000;
apcf.requests = 0;
apcf.limits_value = NULL;
ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
nxt_nitems(nxt_router_app_conf), &apcf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "application map error");
goto app_fail;
}
if (apcf.limits_value != NULL) {
if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
nxt_log(task, NXT_LOG_CRIT, "application limits is not object");
goto app_fail;
}
ret = nxt_conf_map_object(mp, apcf.limits_value,
nxt_router_app_limits_conf,
nxt_nitems(nxt_router_app_limits_conf),
&apcf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "application limits map error");
goto app_fail;
}
}
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application workers: %D", apcf.workers);
nxt_debug(task, "application request timeout: %D", apcf.timeout);
nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
if (lang == NULL) {
nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
&apcf.type);
goto app_fail;
}
nxt_debug(task, "application language module: \"%s\"", lang->file);
ret = nxt_thread_mutex_create(&app->mutex);
if (ret != NXT_OK) {
goto app_fail;
}
nxt_queue_init(&app->ports);
nxt_queue_init(&app->requests);
nxt_queue_init(&app->pending);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
app->type = lang->type;
app->max_workers = apcf.workers;
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
app->live = 1;
app->max_pending_responses = 2;
app->max_requests = apcf.requests;
app->prepare_msg = nxt_app_prepare_msg[lang->type];
nxt_queue_insert_tail(&tmcf->apps, &app->link);
nxt_router_app_use(task, app, 1);
}
http = nxt_conf_get_path(conf, &http_path);
#if 0
if (http == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
return NXT_ERROR;
}
#endif
listeners = nxt_conf_get_path(conf, &listeners_path);
if (listeners == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
return NXT_ERROR;
}
next = 0;
for ( ;; ) {
listener = nxt_conf_next_object_member(listeners, &name, &next);
if (listener == NULL) {
break;
}
skcf = nxt_router_socket_conf(task, tmcf, &name);
if (skcf == NULL) {
goto fail;
}
ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
nxt_nitems(nxt_router_listener_conf), &lscf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "listener map error");
goto fail;
}
nxt_debug(task, "application: %V", &lscf.application);
// STUB, default values if http block is not defined.
skcf->header_buffer_size = 2048;
skcf->large_header_buffer_size = 8192;
skcf->large_header_buffers = 4;
skcf->body_buffer_size = 16 * 1024;
skcf->max_body_size = 2 * 1024 * 1024;
skcf->idle_timeout = 65000;
skcf->header_read_timeout = 5000;
skcf->body_read_timeout = 5000;
skcf->send_timeout = 5000;
if (http != NULL) {
ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
nxt_nitems(nxt_router_http_conf), skcf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "http map error");
goto fail;
}
}
skcf->listen->handler = nxt_http_conn_init;
skcf->router_conf = tmcf->conf;
skcf->router_conf->count++;
skcf->application = nxt_router_listener_application(tmcf,
&lscf.application);
}
nxt_queue_add(&tmcf->deleting, &router->sockets);
nxt_queue_init(&router->sockets);
return NXT_OK;
app_fail:
nxt_free(app);
fail:
nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
} nxt_queue_loop;
return NXT_ERROR;
}
static nxt_app_t *
nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
{
nxt_app_t *app;
nxt_queue_each(app, queue, nxt_app_t, link) {
if (nxt_strstr_eq(name, &app->name)) {
return app;
}
} nxt_queue_loop;
return NULL;
}
static nxt_app_t *
nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
{
nxt_app_t *app;
app = nxt_router_app_find(&tmcf->apps, name);
if (app == NULL) {
app = nxt_router_app_find(&tmcf->previous, name);
}
return app;
}
static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t *name)
{
size_t size;
nxt_int_t ret;
nxt_bool_t wildcard;
nxt_sockaddr_t *sa;
nxt_socket_conf_t *skcf;
nxt_listen_socket_t *ls;
sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
if (nxt_slow_path(sa == NULL)) {
nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name);
return NULL;
}
sa->type = SOCK_STREAM;
nxt_debug(task, "router listener: \"%*s\"",
sa->length, nxt_sockaddr_start(sa));
skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t));
if (nxt_slow_path(skcf == NULL)) {
return NULL;
}
size = nxt_sockaddr_size(sa);
ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
if (ret != NXT_OK) {
ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
if (nxt_slow_path(ls == NULL)) {
return NULL;
}
skcf->listen = ls;
ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
nxt_memcpy(ls->sockaddr, sa, size);
nxt_listen_socket_remote_size(ls);
ls->socket = -1;
ls->backlog = NXT_LISTEN_BACKLOG;
ls->flags = NXT_NONBLOCK;
ls->read_after_accept = 1;
}
switch (sa->u.sockaddr.sa_family) {
#if (NXT_HAVE_UNIX_DOMAIN)
case AF_UNIX:
wildcard = 0;
break;
#endif
#if (NXT_INET6)
case AF_INET6:
wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
break;
#endif
case AF_INET:
default:
wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
break;
}
if (!wildcard) {
skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size);
if (nxt_slow_path(skcf->sockaddr == NULL)) {
return NULL;
}
nxt_memcpy(skcf->sockaddr, sa, size);
}
return skcf;
}
static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
{
nxt_router_t *router;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
router = tmcf->conf->router;
for (qlk = nxt_queue_first(&router->sockets);
qlk != nxt_queue_tail(&router->sockets);
qlk = nxt_queue_next(qlk))
{
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
nskcf->listen = skcf->listen;
nxt_queue_remove(qlk);
nxt_queue_insert_tail(&tmcf->keeping, qlk);
nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
return NXT_OK;
}
}
nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
return NXT_DECLINED;
}
static void
nxt_router_listen_socket_rpc_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
{
size_t size;
uint32_t stream;
nxt_buf_t *b;
nxt_port_t *main_port, *router_port;
nxt_runtime_t *rt;
nxt_socket_rpc_t *rpc;
rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
if (rpc == NULL) {
goto fail;
}
rpc->socket_conf = skcf;
rpc->temp_conf = tmcf;
size = nxt_sockaddr_size(skcf->listen->sockaddr);
b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
if (b == NULL) {
goto fail;
}
b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
rt = task->thread->runtime;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
stream = nxt_port_rpc_register_handler(task, router_port,
nxt_router_listen_socket_ready,
nxt_router_listen_socket_error,
main_port->pid, rpc);
if (stream == 0) {
goto fail;
}
nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
stream, router_port->id, b);
return;
fail:
nxt_router_conf_error(task, tmcf);
}
static void
nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
nxt_socket_t s;
nxt_socket_rpc_t *rpc;
rpc = data;
s = msg->fd;
ret = nxt_socket_nonblocking(task, s);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
rpc->socket_conf->listen->socket = s;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
return;
fail:
nxt_socket_close(task, s);
nxt_router_conf_error(task, rpc->temp_conf);
}
static void
nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
u_char *p;
size_t size;
uint8_t error;
nxt_buf_t *in, *out;
nxt_sockaddr_t *sa;
nxt_socket_rpc_t *rpc;
nxt_router_temp_conf_t *tmcf;
static nxt_str_t socket_errors[] = {
nxt_string("ListenerSystem"),
nxt_string("ListenerNoIPv6"),
nxt_string("ListenerPort"),
nxt_string("ListenerInUse"),
nxt_string("ListenerNoAddress"),
nxt_string("ListenerNoAccess"),
nxt_string("ListenerPath"),
};
rpc = data;
sa = rpc->socket_conf->listen->sockaddr;
tmcf = rpc->temp_conf;
in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
nxt_assert(in != NULL);
p = in->mem.pos;
error = *p++;
size = sizeof("listen socket error: ") - 1
+ sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
+ sa->length + socket_errors[error].length + (in->mem.free - p);
out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
if (nxt_slow_path(out == NULL)) {
return;
}
out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
"listen socket error: "
"{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
sa->length, nxt_sockaddr_start(sa),
&socket_errors[error], in->mem.free - p, p);
nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
nxt_router_conf_error(task, tmcf);
}
static nxt_int_t
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
{
nxt_int_t ret;
nxt_uint_t n, threads;
nxt_queue_link_t *qlk;
nxt_router_engine_conf_t *recf;
threads = tmcf->conf->threads;
tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
sizeof(nxt_router_engine_conf_t));
if (nxt_slow_path(tmcf->engines == NULL)) {
return NXT_ERROR;
}
n = 0;
for (qlk = nxt_queue_first(&router->engines);
qlk != nxt_queue_tail(&router->engines);
qlk = nxt_queue_next(qlk))
{
recf = nxt_array_zero_add(tmcf->engines);
if (nxt_slow_path(recf == NULL)) {
return NXT_ERROR;
}
recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
if (n < threads) {
recf->action = NXT_ROUTER_ENGINE_KEEP;
ret = nxt_router_engine_conf_update(tmcf, recf);
} else {
recf->action = NXT_ROUTER_ENGINE_DELETE;
ret = nxt_router_engine_conf_delete(tmcf, recf);
}
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
n++;
}
tmcf->new_threads = n;
while (n < threads) {
recf = nxt_array_zero_add(tmcf->engines);
if (nxt_slow_path(recf == NULL)) {
return NXT_ERROR;
}
recf->action = NXT_ROUTER_ENGINE_ADD;
recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
if (nxt_slow_path(recf->engine == NULL)) {
return NXT_ERROR;
}
ret = nxt_router_engine_conf_create(tmcf, recf);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
n++;
}
return NXT_OK;
}
static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
nxt_int_t ret;
ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
return ret;
}
static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
nxt_int_t ret;
ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
nxt_router_listen_socket_update);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
return ret;
}
static nxt_int_t
nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
nxt_int_t ret;
ret = nxt_router_engine_quit(tmcf, recf);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
}
static nxt_int_t
nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler)
{
nxt_joint_job_t *job;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
nxt_socket_conf_joint_t *joint;
for (qlk = nxt_queue_first(sockets);
qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk))
{
job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
if (nxt_slow_path(job == NULL)) {
return NXT_ERROR;
}
job->work.next = recf->jobs;
recf->jobs = &job->work;
job->task = tmcf->engine->task;
job->work.handler = handler;
job->work.task = &job->task;
job->work.obj = job;
job->tmcf = tmcf;
tmcf->count++;
joint = nxt_mp_alloc(tmcf->conf->mem_pool,
sizeof(nxt_socket_conf_joint_t));
if (nxt_slow_path(joint == NULL)) {
return NXT_ERROR;
}
job->work.data = joint;
joint->count = 1;
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
skcf->count++;
joint->socket_conf = skcf;
joint->engine = recf->engine;
}
return NXT_OK;
}
static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
nxt_joint_job_t *job;
job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
if (nxt_slow_path(job == NULL)) {
return NXT_ERROR;
}
job->work.next = recf->jobs;
recf->jobs = &job->work;
job->task = tmcf->engine->task;
job->work.handler = nxt_router_worker_thread_quit;
job->work.task = &job->task;
job->work.obj = NULL;
job->work.data = NULL;
job->tmcf = NULL;
return NXT_OK;
}
static nxt_int_t
nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
{
nxt_joint_job_t *job;
nxt_queue_link_t *qlk;
for (qlk = nxt_queue_first(sockets);
qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk))
{
job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
if (nxt_slow_path(job == NULL)) {
return NXT_ERROR;
}
job->work.next = recf->jobs;
recf->jobs = &job->work;
job->task = tmcf->engine->task;
job->work.handler = nxt_router_listen_socket_delete;
job->work.task = &job->task;
job->work.obj = job;
job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
job->tmcf = tmcf;
tmcf->count++;
}
return NXT_OK;
}
static nxt_int_t
nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf)
{
nxt_int_t ret;
nxt_uint_t i, threads;
nxt_router_engine_conf_t *recf;
recf = tmcf->engines->elts;
threads = tmcf->conf->threads;
for (i = tmcf->new_threads; i < threads; i++) {
ret = nxt_router_thread_create(task, rt, recf[i].engine);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
}
return NXT_OK;
}
static nxt_int_t
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine)
{
nxt_int_t ret;
nxt_thread_link_t *link;
nxt_thread_handle_t handle;
link = nxt_zalloc(sizeof(nxt_thread_link_t));
if (nxt_slow_path(link == NULL)) {
return NXT_ERROR;
}
link->start = nxt_router_thread_start;
link->engine = engine;
link->work.handler = nxt_router_thread_exit_handler;
link->work.task = task;
link->work.data = link;
nxt_queue_insert_tail(&rt->engines, &engine->link);
ret = nxt_thread_create(&handle, link);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_queue_remove(&engine->link);
}
return ret;
}
static void
nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf)
{
nxt_app_t *app;
nxt_port_t *port;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
nxt_debug(task, "about to free app '%V' %p", &app->name, app);
app->live = 0;
do {
port = nxt_router_app_get_idle_port(app);
if (port == NULL) {
break;
}
nxt_debug(task, "port %p send quit", port);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
NULL);
nxt_port_use(task, port, -1);
} while (1);
nxt_router_app_use(task, app, -1);
} nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
nxt_queue_add(&router->apps, &tmcf->apps);
}
static void
nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
{
nxt_uint_t n;
nxt_event_engine_t *engine;
nxt_router_engine_conf_t *recf;
recf = tmcf->engines->elts;
for (n = tmcf->engines->nelts; n != 0; n--) {
engine = recf->engine;
switch (recf->action) {
case NXT_ROUTER_ENGINE_KEEP:
break;
case NXT_ROUTER_ENGINE_ADD:
nxt_queue_insert_tail(&router->engines, &engine->link0);
break;
case NXT_ROUTER_ENGINE_DELETE:
nxt_queue_remove(&engine->link0);
break;
}
nxt_router_engine_post(engine, recf->jobs);
recf++;
}
}
static void
nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
{
nxt_work_t *work, *next;
for (work = jobs; work != NULL; work = next) {
next = work->next;
work->next = NULL;
nxt_event_engine_post(engine, work);
}
}
static nxt_port_handlers_t nxt_router_app_port_handlers = {
.mmap = nxt_port_mmap_handler,
.data = nxt_port_rpc_handler,
};
static void
nxt_router_thread_start(void *data)
{
nxt_int_t ret;
nxt_port_t *port;
nxt_task_t *task;
nxt_thread_t *thread;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
link = data;
engine = link->engine;
task = &engine->task;
thread = nxt_thread();
nxt_event_engine_thread_adopt(engine);
/* STUB */
thread->runtime = engine->task.thread->runtime;
engine->task.thread = thread;
engine->task.log = thread->log;
thread->engine = engine;
thread->task = &engine->task;
#if 0
thread->fiber = &engine->fibers->fiber;
#endif
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
if (nxt_slow_path(engine->mem_pool == NULL)) {
return;
}
port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
NXT_PROCESS_ROUTER);
if (nxt_slow_path(port == NULL)) {
return;
}
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_use(task, port, -1);
return;
}
engine->port = port;
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
nxt_event_engine_start(engine);
}
static void
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
nxt_socket_conf_t *skcf;
nxt_listen_event_t *lev;
nxt_listen_socket_t *ls;
nxt_thread_spinlock_t *lock;
nxt_socket_conf_joint_t *joint;
job = obj;
joint = data;
nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
skcf = joint->socket_conf;
ls = skcf->listen;
lev = nxt_listen_event(task, ls);
if (nxt_slow_path(lev == NULL)) {
nxt_router_listen_socket_release(task, skcf);
return;
}
lev->socket.data = joint;
lock = &skcf->router_conf->router->lock;
nxt_thread_spin_lock(lock);
ls->count++;
nxt_thread_spin_unlock(lock);
job->work.next = NULL;
job->work.handler = nxt_router_conf_wait;
nxt_event_engine_post(job->tmcf->engine, &job->work);
}
nxt_inline nxt_listen_event_t *
nxt_router_listen_event(nxt_queue_t *listen_connections,
nxt_socket_conf_t *skcf)
{
nxt_socket_t fd;
nxt_queue_link_t *qlk;
nxt_listen_event_t *lev;
fd = skcf->listen->socket;
for (qlk = nxt_queue_first(listen_connections);
qlk != nxt_queue_tail(listen_connections);
qlk = nxt_queue_next(qlk))
{
lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
if (fd == lev->socket.fd) {
return lev;
}
}
return NULL;
}
static void
nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
nxt_event_engine_t *engine;
nxt_listen_event_t *lev;
nxt_socket_conf_joint_t *joint, *old;
job = obj;
joint = data;
engine = task->thread->engine;
nxt_queue_insert_tail(&engine->joints, &joint->link);
lev = nxt_router_listen_event(&engine->listen_connections,
joint->socket_conf);
old = lev->socket.data;
lev->socket.data = joint;
lev->listen = joint->socket_conf->listen;
job->work.next = NULL;
job->work.handler = nxt_router_conf_wait;
nxt_event_engine_post(job->tmcf->engine, &job->work);
/*
* The task is allocated from configuration temporary
* memory pool so it can be freed after engine post operation.
*/
nxt_router_conf_release(&engine->task, old);
}
static void
nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
nxt_socket_conf_t *skcf;
nxt_listen_event_t *lev;
nxt_event_engine_t *engine;
job = obj;
skcf = data;
engine = task->thread->engine;
lev = nxt_router_listen_event(&engine->listen_connections, skcf);
nxt_fd_event_delete(engine, &lev->socket);
nxt_debug(task, "engine %p: listen socket delete: %d", engine,
lev->socket.fd);
lev->timer.handler = nxt_router_listen_socket_close;
lev->timer.work_queue = &engine->fast_work_queue;
nxt_timer_add(engine, &lev->timer, 0);
job->work.next = NULL;
job->work.handler = nxt_router_conf_wait;
nxt_event_engine_post(job->tmcf->engine, &job->work);
}
static void
nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
{
nxt_event_engine_t *engine;
nxt_debug(task, "router worker thread quit");
engine = task->thread->engine;
engine->shutdown = 1;
if (nxt_queue_is_empty(&engine->joints)) {
nxt_thread_exit(task->thread);
}
}
static void
nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *timer;
nxt_listen_event_t *lev;
nxt_socket_conf_joint_t *joint;
timer = obj;
lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
joint = lev->socket.data;
nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
lev->socket.fd);
nxt_queue_remove(&lev->link);
/* 'task' refers to lev->task and we cannot use after nxt_free() */
task = &task->thread->engine->task;
nxt_router_listen_socket_release(task, joint->socket_conf);
nxt_free(lev);
nxt_router_conf_release(task, joint);
}
static void
nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
{
nxt_listen_socket_t *ls;
nxt_thread_spinlock_t *lock;
ls = skcf->listen;
lock = &skcf->router_conf->router->lock;
nxt_thread_spin_lock(lock);
nxt_debug(task, "engine %p: listen socket release: ls->count %D",
task->thread->engine, ls->count);
if (--ls->count != 0) {
ls = NULL;
}
nxt_thread_spin_unlock(lock);
if (ls != NULL) {
nxt_socket_close(task, ls->socket);
nxt_free(ls);
}
}
static void
nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
{
nxt_bool_t exit;
nxt_socket_conf_t *skcf;
nxt_router_conf_t *rtcf;
nxt_event_engine_t *engine;
nxt_thread_spinlock_t *lock;
nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
if (--joint->count != 0) {
return;
}
nxt_queue_remove(&joint->link);
skcf = joint->socket_conf;
rtcf = skcf->router_conf;
lock = &rtcf->router->lock;
nxt_thread_spin_lock(lock);
nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
rtcf, rtcf->count);
if (--skcf->count != 0) {
rtcf = NULL;
} else {
nxt_queue_remove(&skcf->link);
if (--rtcf->count != 0) {
rtcf = NULL;
}
}
nxt_thread_spin_unlock(lock);
/* TODO remove engine->port */
/* TODO excude from connected ports */
/* The joint content can be used before memory pool destruction. */
engine = joint->engine;
exit = (engine->shutdown && nxt_queue_is_empty(&engine->joints));
if (rtcf != NULL) {
nxt_debug(task, "old router conf is destroyed");
nxt_mp_thread_adopt(rtcf->mem_pool);
nxt_mp_destroy(rtcf->mem_pool);
}
if (exit) {
nxt_thread_exit(task->thread);
}
}
static void
nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_port_t *port;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
nxt_thread_handle_t handle;
handle = (nxt_thread_handle_t) obj;
link = data;
nxt_thread_wait(handle);
engine = link->engine;
nxt_queue_remove(&engine->link);
port = engine->port;
// TODO notify all apps
port->engine = task->thread->engine;
nxt_mp_thread_adopt(port->mem_pool);
nxt_port_use(task, port, -1);
nxt_mp_thread_adopt(engine->mem_pool);
nxt_mp_destroy(engine->mem_pool);
nxt_event_engine_free(engine);
nxt_free(link);
}
static void
nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
size_t dump_size;
nxt_int_t ret;
nxt_buf_t *b, *last;
nxt_http_request_t *r;
nxt_req_conn_link_t *rc;
nxt_app_parse_ctx_t *ar;
b = msg->buf;
rc = data;
dump_size = nxt_buf_used_size(b);
if (dump_size > 300) {
dump_size = 300;
}
nxt_debug(task, "%srouter app data (%z): %*s",
msg->port_msg.last ? "last " : "", msg->size, dump_size,
b->mem.pos);
if (msg->size == 0) {
b = NULL;
}
ar = rc->ap;
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
last = nxt_http_request_last_buffer(task, ar->request);
if (nxt_slow_path(last == NULL)) {
nxt_app_http_req_done(task, ar);
nxt_router_rc_unlink(task, rc);
return;
}
nxt_buf_chain_add(&b, last);
nxt_router_rc_unlink(task, rc);
} else {
if (rc->app->timeout != 0) {
ar->timer.handler = nxt_router_app_timeout;
nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
}
}
if (b == NULL) {
return;
}
if (msg->buf == b) {
/* Disable instant buffer completion/re-using by port. */
msg->buf = NULL;
}
r = ar->request;
if (r->header_sent) {
nxt_buf_chain_add(&r->out, b);
nxt_http_request_send_body(task, r, NULL);
} else {
ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
if (nxt_slow_path(ret != NXT_DONE)) {
goto fail;
}
r->resp.fields = ar->resp_parser.fields;
ret = nxt_http_fields_process(r->resp.fields,
&nxt_response_fields_hash, r);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
} else {
nxt_buf_chain_add(&r->out, b);
}
r->state = &nxt_http_request_send_state;
nxt_http_request_header_send(task, r);
}
return;
fail:
nxt_app_http_req_done(task, ar);
nxt_router_rc_unlink(task, rc);
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
}
static const nxt_http_request_state_t nxt_http_request_send_state
nxt_aligned(64) =
{
.ready_handler = nxt_http_request_send_body,
.error_handler = nxt_http_request_close_handler,
};
static void
nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *out;
nxt_http_request_t *r;
r = obj;
out = r->out;
if (out != NULL) {
r->out = NULL;
nxt_http_request_send(task, r, out);
}
}
static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t res;
nxt_port_t *port;
nxt_bool_t cancelled;
nxt_req_app_link_t *ra;
nxt_req_conn_link_t *rc;
rc = data;
ra = rc->ra;
if (ra != NULL) {
cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
if (cancelled) {
nxt_router_ra_inc_use(ra);
res = nxt_router_app_port(task, rc->app, ra);
if (res == NXT_OK) {
port = ra->app_port;
nxt_assert(port != NULL);
nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
port->pid);
nxt_router_app_prepare_request(task, ra);
}
msg->port_msg.last = 0;
return;
}
}
nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE);
nxt_router_rc_unlink(task, rc);
}
static void
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_app_t *app;
nxt_port_t *port;
app = data;
port = msg->u.new_port;
nxt_assert(app != NULL);
nxt_assert(port != NULL);
port->app = app;
nxt_thread_mutex_lock(&app->mutex);
nxt_assert(app->pending_workers != 0);
app->pending_workers--;
app->workers++;
nxt_thread_mutex_unlock(&app->mutex);
nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
nxt_router_app_port_release(task, port, 0, 0);
}
static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_app_t *app;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra;
app = data;
nxt_assert(app != NULL);
nxt_debug(task, "app '%V' %p start error", &app->name, app);
nxt_thread_mutex_lock(&app->mutex);
nxt_assert(app->pending_workers != 0);
app->pending_workers--;
if (!nxt_queue_is_empty(&app->requests)) {
lnk = nxt_queue_last(&app->requests);
nxt_queue_remove(lnk);
lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
} else {
ra = NULL;
}
nxt_thread_mutex_unlock(&app->mutex);
if (ra != NULL) {
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, ra->stream);
nxt_router_ra_error(ra, 500, "Failed to start application worker");
nxt_router_ra_use(task, ra, -1);
}
nxt_router_app_use(task, app, -1);
}
void
nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
{
int c;
c = nxt_atomic_fetch_add(&app->use_count, i);
if (i < 0 && c == -i) {
nxt_assert(app->live == 0);
nxt_assert(app->workers == 0);
nxt_assert(app->pending_workers == 0);
nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
}
}
nxt_inline nxt_bool_t
nxt_router_app_first_port_busy(nxt_app_t *app)
{
nxt_port_t *port;
nxt_queue_link_t *lnk;
lnk = nxt_queue_first(&app->ports);
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
return port->app_pending_responses > 0;
}
nxt_inline nxt_port_t *
nxt_router_pop_first_port(nxt_app_t *app)
{
nxt_port_t *port;
nxt_queue_link_t *lnk;
lnk = nxt_queue_first(&app->ports);
nxt_queue_remove(lnk);
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
port->app_pending_responses++;
if ((app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses)
&& (app->max_requests == 0
|| port->app_responses + port->app_pending_responses
< app->max_requests))
{
nxt_queue_insert_tail(&app->ports, lnk);
nxt_port_inc_use(port);
} else {
lnk->next = NULL;
}
return port;
}
static nxt_port_t *
nxt_router_app_get_idle_port(nxt_app_t *app)
{
nxt_port_t *port;
port = NULL;
nxt_thread_mutex_lock(&app->mutex);
nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
if (port->app_pending_responses > 0) {
port = NULL;
continue;
}
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
break;
} nxt_queue_loop;
nxt_thread_mutex_unlock(&app->mutex);
return port;
}
static void
nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
nxt_req_app_link_t *ra;
app = obj;
ra = data;
nxt_assert(app != NULL);
nxt_assert(ra != NULL);
nxt_assert(ra->app_port != NULL);
nxt_debug(task, "app '%V' %p process next stream #%uD",
&app->name, app, ra->stream);
nxt_router_app_prepare_request(task, ra);
}
static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response)
{
nxt_app_t *app;
nxt_bool_t send_quit, cancelled;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra, *pending_ra, *re_ra;
nxt_port_select_state_t state;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
ra = NULL;
app = port->app;
nxt_thread_mutex_lock(&app->mutex);
port->app_pending_responses -= request_failed + got_response;
port->app_responses += got_response;
if (nxt_slow_path(app->live == 0)) {
goto app_dead;
}
if (port->pair[1] != -1
&& (app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses)
&& (app->max_requests == 0
|| port->app_responses + port->app_pending_responses
< app->max_requests))
{
if (port->app_link.next == NULL) {
if (port->app_pending_responses > 0) {
nxt_queue_insert_tail(&app->ports, &port->app_link);
} else {
nxt_queue_insert_head(&app->ports, &port->app_link);
}
nxt_port_inc_use(port);
} else {
if (port->app_pending_responses == 0
&& nxt_queue_first(&app->ports) != &port->app_link)
{
nxt_queue_remove(&port->app_link);
nxt_queue_insert_head(&app->ports, &port->app_link);
}
}
}
if (!nxt_queue_is_empty(&app->ports)
&& !nxt_queue_is_empty(&app->requests))
{
lnk = nxt_queue_first(&app->requests);
nxt_queue_remove(lnk);
lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
ra->app_port = nxt_router_pop_first_port(app);
if (ra->app_port->app_pending_responses > 1) {
nxt_router_ra_pending(task, app, ra);
}
}
app_dead:
/* Pop first pending request for this port. */
if ((request_failed > 0 || got_response > 0)
&& !nxt_queue_is_empty(&port->pending_requests))
{
lnk = nxt_queue_first(&port->pending_requests);
nxt_queue_remove(lnk);
lnk->next = NULL;
pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
link_port_pending);
nxt_assert(pending_ra->link_app_pending.next != NULL);
nxt_queue_remove(&pending_ra->link_app_pending);
pending_ra->link_app_pending.next = NULL;
} else {
pending_ra = NULL;
}
/* Try to cancel and re-schedule first stalled request for this app. */
if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
lnk = nxt_queue_first(&app->pending);
re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
nxt_debug(task, "app '%V' stalled request #%uD detected",
&app->name, re_ra->stream);
cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
re_ra->stream);
if (cancelled) {
nxt_router_ra_inc_use(re_ra);
state.ra = re_ra;
state.app = app;
nxt_router_port_select(task, &state);
goto re_ra_cancelled;
}
}
}
re_ra = NULL;
re_ra_cancelled:
send_quit = (app->live == 0 && port->app_pending_responses > 0)
|| (app->max_requests > 0 && port->app_pending_responses == 0
&& port->app_responses >= app->max_requests);
nxt_thread_mutex_unlock(&app->mutex);
if (pending_ra != NULL) {
nxt_router_ra_use(task, pending_ra, -1);
}
if (re_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, re_ra);
}
}
if (ra != NULL) {
nxt_router_ra_use(task, ra, -1);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, ra);
goto adjust_use;
}
/* ? */
if (port->pair[1] == -1) {
nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
&app->name, app, port, port->pid);
goto adjust_use;
}
if (send_quit) {
nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
&app->name, app);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
goto adjust_use;
}
nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
&app->name, app);
adjust_use:
if (request_failed > 0 || got_response > 0) {
nxt_port_use(task, port, -1);
}
}
void
nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
{
nxt_app_t *app;
nxt_bool_t unchain, start_worker;
app = port->app;
nxt_assert(app != NULL);
nxt_thread_mutex_lock(&app->mutex);
unchain = port->app_link.next != NULL;
if (unchain) {
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
}
app->workers--;
start_worker = app->live != 0
&& nxt_queue_is_empty(&app->requests) == 0
&& app->workers + app->pending_workers < app->max_workers;
if (start_worker) {
app->pending_workers++;
}
nxt_thread_mutex_unlock(&app->mutex);
nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
if (unchain) {
nxt_port_use(task, port, -1);
}
if (start_worker) {
nxt_router_start_worker(task, app);
}
}
static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
nxt_app_t *app;
nxt_req_app_link_t *ra;
ra = state->ra;
app = state->app;
state->failed_port_use_delta = 0;
if (nxt_queue_chk_remove(&ra->link_app_requests))
{
nxt_router_ra_dec_use(ra);
}
if (nxt_queue_chk_remove(&ra->link_port_pending))
{
nxt_assert(ra->link_app_pending.next != NULL);
nxt_queue_remove(&ra->link_app_pending);
ra->link_app_pending.next = NULL;
nxt_router_ra_dec_use(ra);
}
state->failed_port = ra->app_port;
if (ra->app_port != NULL) {
state->failed_port_use_delta--;
state->failed_port->app_pending_responses--;
if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
state->failed_port_use_delta--;
}
ra->app_port = NULL;
}
state->can_start_worker = (app->workers + app->pending_workers)
< app->max_workers;
state->port = NULL;
if (nxt_queue_is_empty(&app->ports)
|| (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
{
ra = nxt_router_ra_create(task, ra);
if (nxt_slow_path(ra == NULL)) {
goto fail;
}
if (nxt_slow_path(state->failed_port != NULL)) {
nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
} else {
nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
}
nxt_router_ra_inc_use(ra);
nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
if (state->can_start_worker) {
app->pending_workers++;
}
} else {
state->port = nxt_router_pop_first_port(app);
if (state->port->app_pending_responses > 1) {
ra = nxt_router_ra_create(task, ra);
if (nxt_slow_path(ra == NULL)) {
goto fail;
}
ra->app_port = state->port;
nxt_router_ra_pending(task, app, ra);
}
}
fail:
state->shared_ra = ra;
}
static nxt_int_t
nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
nxt_int_t res;
nxt_app_t *app;
nxt_req_app_link_t *ra;
ra = state->shared_ra;
app = state->app;
if (state->failed_port_use_delta != 0) {
nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
}
if (nxt_slow_path(ra == NULL)) {
if (state->port != NULL) {
nxt_port_use(task, state->port, -1);
}
nxt_router_ra_error(state->ra, 500,
"Failed to allocate shared req<->app link");
nxt_router_ra_use(task, state->ra, -1);
return NXT_ERROR;
}
if (state->port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
ra->app_port = state->port;
return NXT_OK;
}
if (!state->can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app);
return NXT_AGAIN;
}
res = nxt_router_start_worker(task, app);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_ra_error(ra, 500, "Failed to start worker");
nxt_router_ra_use(task, ra, -1);
return NXT_ERROR;
}
return NXT_AGAIN;
}
static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
{
nxt_port_select_state_t state;
state.ra = ra;
state.app = app;
nxt_thread_mutex_lock(&app->mutex);
nxt_router_port_select(task, &state);
nxt_thread_mutex_unlock(&app->mutex);
return nxt_router_port_post_select(task, &state);
}
void
nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
{
nxt_int_t res;
nxt_app_t *app;
nxt_port_t *port;
nxt_event_engine_t *engine;
nxt_http_request_t *r;
nxt_req_app_link_t ra_local, *ra;
nxt_req_conn_link_t *rc;
r = ar->request;
app = r->socket_conf->application;
if (app == NULL) {
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
return;
}
engine = task->thread->engine;
rc = nxt_port_rpc_register_handler_ex(task, engine->port,
nxt_router_response_ready_handler,
nxt_router_response_error_handler,
sizeof(nxt_req_conn_link_t));
if (nxt_slow_path(rc == NULL)) {
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
return;
}
rc->stream = nxt_port_rpc_ex_stream(rc);
rc->app = app;
nxt_router_app_use(task, app, 1);
rc->ap = ar;
ra = &ra_local;
nxt_router_ra_init(task, ra, rc);
res = nxt_router_app_port(task, app, ra);
if (res != NXT_OK) {
return;
}
ra = rc->ra;
port = ra->app_port;
nxt_assert(port != NULL);
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
nxt_router_app_prepare_request(task, ra);
}
static void
nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
{
}
static void
nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
{
uint32_t request_failed;
nxt_buf_t *b;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t *ap;
nxt_assert(ra->app_port != NULL);
port = ra->app_port;
reply_port = ra->reply_port;
ap = ra->ap;
request_failed = 1;
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
reply_port->id);
if (nxt_slow_path(c_port != reply_port)) {
res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_ra_error(ra, 500,
"Failed to send reply port to application");
goto release_port;
}
nxt_process_connected_port_add(port->process, reply_port);
}
wmsg.port = port;
wmsg.write = NULL;
wmsg.buf = &wmsg.write;
wmsg.stream = ra->stream;
res = port->app->prepare_msg(task, &ap->r, &wmsg);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_ra_error(ra, 500,
"Failed to prepare message for application");
goto release_port;
}
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);
request_failed = 0;
ra->msg_info.buf = wmsg.write;
ra->msg_info.completion_handler = wmsg.write->completion_handler;
for (b = wmsg.write; b != NULL; b = b->next) {
b->completion_handler = nxt_router_dummy_buf_completion;
}
res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
ra->stream);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_ra_error(ra, 500,
"Failed to get tracking area");
goto release_port;
}
res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, ra->stream, reply_port->id, wmsg.write,
&ra->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_ra_error(ra, 500,
"Failed to send message to application");
goto release_port;
}
release_port:
nxt_router_app_port_release(task, port, request_failed, 0);
nxt_router_ra_update_peer(task, ra);
}
static nxt_int_t
nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg)
{
nxt_int_t rc;
nxt_buf_t *b;
nxt_http_field_t *field;
nxt_app_request_header_t *h;
static const nxt_str_t prefix = nxt_string("HTTP_");
static const nxt_str_t eof = nxt_null_string;
h = &r->header;
#define RC(S) \
do { \
rc = (S); \
if (nxt_slow_path(rc != NXT_OK)) { \
goto fail; \
} \
} while(0)
#define NXT_WRITE(N) \
RC(nxt_app_msg_write_str(task, wmsg, N))
/* TODO error handle, async mmap buffer assignment */
NXT_WRITE(&h->method);
NXT_WRITE(&h->target);
if (h->path.start == h->target.start) {
NXT_WRITE(&eof);
} else {
NXT_WRITE(&h->path);
}
if (h->query.start != NULL) {
RC(nxt_app_msg_write_size(task, wmsg,
h->query.start - h->target.start + 1));
} else {
RC(nxt_app_msg_write_size(task, wmsg, 0));
}
NXT_WRITE(&h->version);
NXT_WRITE(&r->remote);
NXT_WRITE(&r->local);
NXT_WRITE(&h->host);
NXT_WRITE(&h->content_type);
NXT_WRITE(&h->content_length);
nxt_list_each(field, h->fields) {
RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name,
field->name_length));
RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
} nxt_list_loop;
/* end-of-headers mark */
NXT_WRITE(&eof);
RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
for(b = r->body.buf; b != NULL; b = b->next) {
RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
nxt_buf_mem_used_size(&b->mem)));
}
#undef NXT_WRITE
#undef RC
return NXT_OK;
fail:
return NXT_ERROR;
}
static nxt_int_t
nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg)
{
nxt_int_t rc;
nxt_buf_t *b;
nxt_bool_t method_is_post;
nxt_http_field_t *field;
nxt_app_request_header_t *h;
static const nxt_str_t prefix = nxt_string("HTTP_");
static const nxt_str_t eof = nxt_null_string;
h = &r->header;
#define RC(S) \
do { \
rc = (S); \
if (nxt_slow_path(rc != NXT_OK)) { \
goto fail; \
} \
} while(0)
#define NXT_WRITE(N) \
RC(nxt_app_msg_write_str(task, wmsg, N))
/* TODO error handle, async mmap buffer assignment */
NXT_WRITE(&h->method);
NXT_WRITE(&h->target);
if (h->path.start == h->target.start) {
NXT_WRITE(&eof);
} else {
NXT_WRITE(&h->path);
}
if (h->query.start != NULL) {
RC(nxt_app_msg_write_size(task, wmsg,
h->query.start - h->target.start + 1));
} else {
RC(nxt_app_msg_write_size(task, wmsg, 0));
}
NXT_WRITE(&h->version);
// PHP_SELF
// SCRIPT_NAME
// SCRIPT_FILENAME
// DOCUMENT_ROOT
NXT_WRITE(&r->remote);
NXT_WRITE(&r->local);
NXT_WRITE(&h->host);
NXT_WRITE(&h->cookie);
NXT_WRITE(&h->content_type);
NXT_WRITE(&h->content_length);
RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
method_is_post = h->method.length == 4
&& h->method.start[0] == 'P'
&& h->method.start[1] == 'O'
&& h->method.start[2] == 'S'
&& h->method.start[3] == 'T';
if (method_is_post) {
for(b = r->body.buf; b != NULL; b = b->next) {
RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
nxt_buf_mem_used_size(&b->mem)));
}
}
nxt_list_each(field, h->fields) {
RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name,
field->name_length));
RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
} nxt_list_loop;
/* end-of-headers mark */
NXT_WRITE(&eof);
if (!method_is_post) {
for(b = r->body.buf; b != NULL; b = b->next) {
RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
nxt_buf_mem_used_size(&b->mem)));
}
}
#undef NXT_WRITE
#undef RC
return NXT_OK;
fail:
return NXT_ERROR;
}
static nxt_int_t
nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
{
nxt_int_t rc;
nxt_buf_t *b;
nxt_http_field_t *field;
nxt_app_request_header_t *h;
static const nxt_str_t eof = nxt_null_string;
h = &r->header;
#define RC(S) \
do { \
rc = (S); \
if (nxt_slow_path(rc != NXT_OK)) { \
goto fail; \
} \
} while(0)
#define NXT_WRITE(N) \
RC(nxt_app_msg_write_str(task, wmsg, N))
/* TODO error handle, async mmap buffer assignment */
NXT_WRITE(&h->method);
NXT_WRITE(&h->target);
if (h->path.start == h->target.start) {
NXT_WRITE(&eof);
} else {
NXT_WRITE(&h->path);
}
if (h->query.start != NULL) {
RC(nxt_app_msg_write_size(task, wmsg,
h->query.start - h->target.start + 1));
} else {
RC(nxt_app_msg_write_size(task, wmsg, 0));
}
NXT_WRITE(&h->version);
NXT_WRITE(&r->remote);
NXT_WRITE(&h->host);
NXT_WRITE(&h->cookie);
NXT_WRITE(&h->content_type);
NXT_WRITE(&h->content_length);
RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
nxt_list_each(field, h->fields) {
RC(nxt_app_msg_write(task, wmsg, field->name, field->name_length));
RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
} nxt_list_loop;
/* end-of-headers mark */
NXT_WRITE(&eof);
RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
for(b = r->body.buf; b != NULL; b = b->next) {
RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
nxt_buf_mem_used_size(&b->mem)));
}
#undef NXT_WRITE
#undef RC
return NXT_OK;
fail:
return NXT_ERROR;
}
const nxt_conn_state_t nxt_router_conn_close_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_free,
};
static void
nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
{
nxt_socket_conf_joint_t *joint;
joint = obj;
nxt_router_conf_release(task, joint);
}
static void
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;
c = obj;
nxt_debug(task, "router conn close done");
nxt_queue_remove(&c->link);
engine = task->thread->engine;
nxt_sockaddr_cache_free(engine, c);
joint = c->joint;
nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
&engine->task, joint, NULL);
nxt_conn_free(task, c);
}
static void
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *timer;
nxt_app_parse_ctx_t *ar;
timer = obj;
nxt_debug(task, "router app timeout");
ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
if (!ar->request->header_sent) {
nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
}
}