summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
commitb0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch)
tree08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_router.c
parentc38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff)
downloadunit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz
unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master. Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c662
1 files changed, 531 insertions, 131 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index b54d1cf3..fa43b09f 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -20,6 +20,19 @@ typedef struct {
} nxt_router_listener_conf_t;
+typedef struct nxt_start_worker_s nxt_start_worker_t;
+
+struct nxt_start_worker_s {
+ uint32_t stream;
+ nxt_app_t *app;
+ nxt_req_conn_link_t *rc;
+ nxt_mp_t *mem_pool;
+ void *joint;
+
+ nxt_work_t work;
+};
+
+
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static nxt_int_t nxt_router_conf_new(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
@@ -87,11 +100,22 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
+static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
+static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
+ void *data);
+
+static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
+ nxt_start_worker_t *sw);
+static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task,
+ nxt_router_t *router, uint32_t id);
+
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
+static void nxt_router_process_http_request_mp(nxt_task_t *task,
+ nxt_req_conn_link_t *rc, nxt_mp_t *mp);
static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
@@ -99,13 +123,19 @@ static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
+static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
+ const char* fmt, ...);
+
static nxt_router_t *nxt_router;
nxt_int_t
-nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
+nxt_router_start(nxt_task_t *task, void *data)
{
- nxt_int_t ret;
- nxt_router_t *router;
+ nxt_int_t ret;
+ nxt_router_t *router;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
ret = nxt_app_http_init(task, rt);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -127,6 +157,51 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
}
+static void
+nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_start_worker_t *sw;
+ nxt_socket_conf_joint_t *joint;
+
+ sw = obj;
+ joint = sw->joint;
+
+ nxt_debug(task, "sw #%uxD release", sw->stream);
+
+ if (nxt_mp_release(sw->mem_pool, sw) == 0) {
+ nxt_router_conf_release(task, joint);
+ }
+}
+
+
+void
+nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_start_worker_t *sw;
+
+ nxt_port_new_port_handler(task, msg);
+
+ if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
+ return;
+ }
+
+ sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream);
+
+ if (nxt_fast_path(sw != NULL)) {
+ msg->new_port->app = sw->app;
+
+ nxt_router_app_release_port(task, msg->new_port, sw->app);
+
+ sw->work.handler = nxt_router_sw_release;
+
+ nxt_debug(task, "post sw #%uxD release to %p", sw->stream,
+ sw->work.data);
+
+ nxt_event_engine_post(sw->work.data, &sw->work);
+ }
+}
+
+
void
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
@@ -412,7 +487,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_app_type_t type;
nxt_sockaddr_t *sa;
- nxt_queue_link_t *qlk, *nqlk;
nxt_conf_value_t *conf, *http;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
@@ -496,19 +570,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application workers: %D", apcf.workers);
- if (nxt_str_eq(&apcf.type, "python", 6)) {
- type = NXT_APP_PYTHON;
-
- } else if (nxt_str_eq(&apcf.type, "php", 3)) {
- type = NXT_APP_PHP;
+ type = nxt_app_parse_type(&apcf.type);
- } else if (nxt_str_eq(&apcf.type, "ruby", 4)) {
- type = NXT_APP_RUBY;
-
- } else if (nxt_str_eq(&apcf.type, "go", 2)) {
- type = NXT_APP_GO;
+ if (type == NXT_APP_UNKNOWN) {
+ nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
+ &apcf.type);
+ goto app_fail;
+ }
- } else {
+ if (nxt_app_modules[type] == NULL) {
nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"",
&apcf.type);
goto app_fail;
@@ -519,10 +589,14 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
goto app_fail;
}
+ nxt_queue_init(&app->ports);
+ nxt_queue_init(&app->requests);
+
app->name = name;
app->type = type;
app->max_workers = apcf.workers;
app->live = 1;
+ app->module = nxt_app_modules[type];
nxt_queue_insert_tail(&tmcf->apps, &app->link);
}
@@ -607,16 +681,13 @@ app_fail:
fail:
- for (qlk = nxt_queue_first(&tmcf->apps);
- qlk != nxt_queue_tail(&tmcf->apps);
- qlk = nqlk)
- {
- nqlk = nxt_queue_next(qlk);
- app = nxt_queue_link_data(qlk, nxt_app_t, link);
+ nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
+ nxt_queue_remove(&app->link);
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
- }
+
+ } nxt_queue_loop;
return NXT_ERROR;
}
@@ -625,19 +696,15 @@ fail:
static nxt_app_t *
nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
{
- nxt_app_t *app;
- nxt_queue_link_t *qlk;
+ nxt_app_t *app;
- for (qlk = nxt_queue_first(queue);
- qlk != nxt_queue_tail(queue);
- qlk = nxt_queue_next(qlk))
- {
- app = nxt_queue_link_data(qlk, nxt_app_t, link);
+ nxt_queue_each(app, queue, nxt_app_t, link) {
if (nxt_strstr_eq(name, &app->name)) {
return app;
}
- }
+
+ } nxt_queue_loop;
return NULL;
}
@@ -1088,8 +1155,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine)
{
nxt_int_t ret;
- nxt_port_t *port;
- nxt_process_t *process;
nxt_thread_link_t *link;
nxt_thread_handle_t handle;
@@ -1107,28 +1172,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_queue_insert_tail(&rt->engines, &engine->link);
- process = nxt_runtime_process_find(rt, nxt_pid);
- if (nxt_slow_path(process == NULL)) {
- return NXT_ERROR;
- }
-
- port = nxt_process_port_new(process);
- if (nxt_slow_path(port == NULL)) {
- return NXT_ERROR;
- }
-
- ret = nxt_port_socket_init(task, port, 0);
- if (nxt_slow_path(ret != NXT_OK)) {
- return ret;
- }
-
- port->engine = 0;
- port->type = NXT_PROCESS_ROUTER;
-
- engine->port = port;
-
- nxt_runtime_port_add(rt, port);
-
ret = nxt_thread_create(&handle, link);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -1142,20 +1185,14 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
- nxt_queue_link_t *qlk, *nqlk;
+ nxt_app_t *app;
- for (qlk = nxt_queue_first(&router->apps);
- qlk != nxt_queue_tail(&router->apps);
- qlk = nqlk)
- {
- nqlk = nxt_queue_next(qlk);
- app = nxt_queue_link_data(qlk, nxt_app_t, link);
+ nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
- // RELEASE APP
- }
+ // TODO RELEASE APP
+ } nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
nxt_queue_add(&router->apps, &tmcf->apps);
@@ -1232,6 +1269,8 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = {
static void
nxt_router_thread_start(void *data)
{
+ nxt_int_t ret;
+ nxt_port_t *port;
nxt_task_t *task;
nxt_thread_t *thread;
nxt_thread_link_t *link;
@@ -1252,12 +1291,26 @@ nxt_router_thread_start(void *data)
thread->task = &engine->task;
thread->fiber = &engine->fibers->fiber;
- nxt_mp_thread_adopt(engine->port->mem_pool);
+ engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
+
+ port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t));
+ if (nxt_slow_path(port == NULL)) {
+ return;
+ }
+
+ port->id = nxt_port_get_next_id();
+ port->pid = nxt_pid;
+
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return;
+ }
- engine->port->socket.task = task;
- nxt_port_create(task, engine->port, nxt_router_app_port_handlers);
+ port->type = NXT_PROCESS_ROUTER;
- engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
+ engine->port = port;
+
+ nxt_port_enable(task, port, nxt_router_app_port_handlers);
nxt_event_engine_start(engine);
}
@@ -1456,6 +1509,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_unlock(lock);
+ /* TODO remove engine->port */
+ /* TODO excude from connected ports */
+
if (rtcf != NULL) {
nxt_debug(task, "old router conf is destroyed");
@@ -1473,6 +1529,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
static void
nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
{
+ nxt_port_t *port;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
nxt_thread_handle_t handle;
@@ -1486,13 +1543,27 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(&engine->link);
+ port = engine->port;
+
+ // TODO notify all apps
+
+ if (port->pair[0] != -1) {
+ nxt_fd_close(port->pair[0]);
+ }
+
+ if (port->pair[1] != -1) {
+ nxt_fd_close(port->pair[1]);
+ }
+
+ if (port->mem_pool) {
+ nxt_mp_destroy(port->mem_pool);
+ }
+
nxt_mp_destroy(engine->mem_pool);
nxt_event_engine_free(engine);
nxt_free(link);
-
- // TODO: free port
}
@@ -1616,28 +1687,265 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
}
+nxt_inline const char *
+nxt_router_text_by_code(int code)
+{
+ switch (code) {
+ case 400: return "Bad request";
+ case 404: return "Not found";
+ case 403: return "Forbidden";
+ case 500:
+ default: return "Internal server error";
+ }
+}
-nxt_inline nxt_port_t *
-nxt_router_app_port(nxt_task_t *task)
+static void
+nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
+ const char* fmt, ...)
{
- nxt_port_t *port;
- nxt_runtime_t *rt;
+ va_list args;
+ nxt_buf_t *b, *last;
+ const char *msg;
+
+ b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0);
+ if (nxt_slow_path(b == NULL)) {
+ /* TODO pogorevaTb */
+ }
+
+ b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
+ "HTTP/1.0 %d %s\r\n"
+ "Content-Type: text/plain\r\n"
+ "Connection: close\r\n\r\n",
+ code, nxt_router_text_by_code(code));
+
+ msg = (const char *) b->mem.free;
+
+ va_start(args, fmt);
+ b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
+ va_end(args);
+
+ nxt_log_alert(task->log, "error %d: %s", code, msg);
+
+ last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
+ if (nxt_slow_path(last == NULL)) {
+ /* TODO pogorevaTb */
+ }
+
+ nxt_buf_chain_add(&b, last);
+
+ if (c->write == NULL) {
+ c->write = b;
+ c->write_state = &nxt_router_conn_write_state;
+
+ nxt_conn_write(task->thread->engine, c);
+ } else {
+ nxt_debug(task, "router data attach out bufs to existing chain");
+
+ nxt_buf_chain_add(&c->write, b);
+ }
+}
+
+
+static void
+nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_app_t *app;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+ nxt_start_worker_t *sw;
+
+ sw = obj;
+ app = sw->app;
+
+ nxt_debug(task, "send sw #%uD", sw->stream);
+
+ nxt_router_sw_add(task, nxt_router, sw);
+ nxt_queue_insert_tail(&app->requests, &sw->rc->app_link);
rt = task->thread->runtime;
+ port = rt->port_by_type[NXT_PROCESS_MASTER];
- nxt_runtime_port_each(rt, port) {
+ b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0);
- if (nxt_pid == port->pid) {
- continue;
- }
+ nxt_buf_cpystr(b, &app->conf);
- if (port->type == NXT_PROCESS_WORKER) {
- return port;
- }
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b);
+}
- } nxt_runtime_port_loop;
- return NULL;
+static nxt_port_t *
+nxt_router_app_get_port(nxt_app_t *app)
+{
+ nxt_port_t *port;
+ nxt_queue_link_t *lnk;
+
+ port = NULL;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (!nxt_queue_is_empty(&app->ports)) {
+ lnk = nxt_queue_first(&app->ports);
+ nxt_queue_remove(lnk);
+
+ lnk->next = NULL;
+
+ port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ return port;
+}
+
+
+static void
+nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_port_t *port;
+ nxt_work_t *work;
+ nxt_queue_link_t *lnk;
+ nxt_req_conn_link_t *rc;
+
+ port = obj;
+ app = data;
+
+ nxt_assert(app != NULL);
+ nxt_assert(app == port->app);
+ nxt_assert(port->app_link.next == NULL);
+
+
+ if (task->thread->engine != port->engine) {
+ work = (nxt_work_t *) (port + 1);
+
+ nxt_debug(task, "post release port to engine %p", port->engine);
+
+ work->next = NULL;
+ work->handler = nxt_router_app_release_port;
+ work->task = port->socket.task;
+ work->obj = port;
+ work->data = app;
+
+ nxt_event_engine_post(port->engine, work);
+
+ return;
+ }
+
+ if (!nxt_queue_is_empty(&app->requests)) {
+ lnk = nxt_queue_first(&app->requests);
+ nxt_queue_remove(lnk);
+
+ rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
+
+ nxt_debug(task, "process request #%uxD", rc->req_id);
+
+ rc->app_port = port;
+
+ nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool);
+
+ return;
+ }
+
+ nxt_debug(task, "app requests queue is empty");
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ nxt_queue_insert_head(&app->ports, &port->app_link);
+
+ nxt_thread_mutex_unlock(&app->mutex);
+}
+
+
+void
+nxt_router_app_remove_port(nxt_port_t *port)
+{
+ nxt_app_t *app;
+
+ if (port->app_link.next == NULL) {
+ return;
+ }
+
+ app = port->app;
+
+#if (NXT_DEBUG)
+ if (nxt_slow_path(app == NULL)) {
+ nxt_abort();
+ }
+#endif
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ nxt_queue_remove(&port->app_link);
+ port->app_link.next = NULL;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+}
+
+
+nxt_inline nxt_int_t
+nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
+{
+ nxt_app_t *app;
+ nxt_conn_t *c;
+ nxt_port_t *port, *master_port;
+ nxt_runtime_t *rt;
+ nxt_start_worker_t *sw;
+ nxt_socket_conf_joint_t *joint;
+
+ port = NULL;
+ c = rc->conn;
+
+ joint = c->listen->socket.data;
+ app = joint->socket_conf->application;
+
+
+ if (app == NULL) {
+ nxt_router_gen_error(task, rc->conn, 500,
+ "Application is NULL in socket_conf");
+ return NXT_ERROR;
+ }
+
+
+ port = nxt_router_app_get_port(app);
+
+ if (port != NULL) {
+ rc->app_port = port;
+ return NXT_OK;
+ }
+
+
+ sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
+
+ if (nxt_slow_path(sw == NULL)) {
+ nxt_router_gen_error(task, rc->conn, 500,
+ "Failed to allocate start worker struct");
+ return NXT_ERROR;
+ }
+
+ nxt_memzero(sw, sizeof(nxt_start_worker_t));
+
+ sw->stream = nxt_random(&task->thread->random);
+ sw->app = app;
+ sw->rc = rc;
+ sw->mem_pool = c->mem_pool;
+ sw->joint = c->listen->socket.data;
+
+ sw->work.handler = nxt_router_send_sw_request;
+ sw->work.task = task;
+ sw->work.obj = sw;
+ sw->work.data = task->thread->engine;
+
+ rt = task->thread->runtime;
+
+ master_port = rt->port_by_type[NXT_PROCESS_MASTER];
+
+ nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream,
+ master_port->engine);
+
+ nxt_event_engine_post(master_port->engine, &sw->work);
+
+ return NXT_AGAIN;
}
@@ -1715,13 +2023,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
}
size = c->read->mem.free - c->read->mem.pos;
- nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
- b->mem.free += size;
- c->read = b;
+ c->read = nxt_buf_cpy(b, c->read->mem.pos, size);
} else {
- // TODO 500 Too long request headers
- nxt_log_alert(task->log, "Too long request headers");
+ nxt_router_gen_error(task, c, 400,
+ "Too long request headers");
+ return;
}
}
}
@@ -1735,15 +2042,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
if (nxt_slow_path(b == NULL)) {
- // TODO 500 Failed to allocate buffer for request body
- nxt_log_alert(task->log, "Failed to allocate buffer for "
- "request body");
+ nxt_router_gen_error(task, c, 500, "Failed to allocate "
+ "buffer for request body");
+ return;
}
- b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos,
- preread);
-
- c->read = b;
+ c->read = nxt_buf_cpy(b, c->read->mem.pos, preread);
}
nxt_debug(task, "router request body read again, rest: %uz",
@@ -1761,26 +2065,11 @@ static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap)
{
- nxt_mp_t *port_mp;
nxt_int_t res;
- nxt_port_t *port, *c_port;
nxt_req_id_t req_id;
- nxt_app_wmsg_t wmsg;
nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc;
- if (nxt_slow_path(nxt_app == NULL)) {
- // 500 Application not found
- nxt_log_alert(task->log, "application is NULL");
- }
-
- port = nxt_router_app_port(task);
-
- if (nxt_slow_path(port == NULL)) {
- // 500 Application port not found
- nxt_log_alert(task->log, "application port not found");
- }
-
engine = task->thread->engine;
do {
@@ -1790,8 +2079,10 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
rc = nxt_conn_request_add(c, req_id);
if (nxt_slow_path(rc == NULL)) {
- // 500 Failed to allocate req->conn link
- nxt_log_alert(task->log, "failed to allocate req->conn link");
+ nxt_router_gen_error(task, c, 500, "Failed to allocate "
+ "req->conn link");
+
+ return;
}
nxt_event_engine_request_add(engine, rc);
@@ -1799,33 +2090,66 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
req_id, c, engine);
+ rc->reply_port = engine->port;
+
+ res = nxt_router_app_port(task, rc);
+
+ if (res != NXT_OK) {
+ return;
+ }
+
+ nxt_router_process_http_request_mp(task, rc, c->mem_pool);
+}
+
+
+static void
+nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
+ nxt_mp_t *mp)
+{
+ nxt_mp_t *port_mp;
+ nxt_int_t res;
+ nxt_port_t *port, *c_port, *reply_port;
+ nxt_app_wmsg_t wmsg;
+ nxt_app_parse_ctx_t *ap;
+
+ port = rc->app_port;
+
+ if (nxt_slow_path(port == NULL)) {
+ nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
+ return;
+ }
+
+ reply_port = rc->reply_port;
+ ap = rc->conn->socket.data;
+
port_mp = port->mem_pool;
- port->mem_pool = c->mem_pool;
+ port->mem_pool = mp;
- c_port = nxt_process_connected_port_find(port->process,
- engine->port->pid,
- engine->port->id);
- if (nxt_slow_path(c_port != engine->port)) {
- res = nxt_port_send_port(task, port, engine->port);
+ c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
+ reply_port->id);
+ if (nxt_slow_path(c_port != reply_port)) {
+ res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
- // 500 Failed to send reply port
- nxt_log_alert(task->log, "failed to send reply port to application");
+ nxt_router_gen_error(task, rc->conn, 500,
+ "Failed to send reply port to application");
+ goto fail;
}
- nxt_process_connected_port_add(port->process, engine->port);
+ nxt_process_connected_port_add(port->process, reply_port);
}
wmsg.port = port;
wmsg.write = NULL;
wmsg.buf = &wmsg.write;
- wmsg.stream = req_id;
+ wmsg.stream = rc->req_id;
- res = nxt_app->prepare_msg(task, &ap->r, &wmsg);
+ res = rc->app_port->app->module->prepare_msg(task, &ap->r, &wmsg);
if (nxt_slow_path(res != NXT_OK)) {
- // 500 Failed to prepare message
- nxt_log_alert(task->log, "failed to prepare message for application");
+ nxt_router_gen_error(task, rc->conn, 500,
+ "Failed to prepare message for application");
+ goto fail;
}
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
@@ -1833,13 +2157,15 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
wmsg.port->socket.fd);
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
- -1, req_id, engine->port->id, wmsg.write);
+ -1, rc->req_id, reply_port->id, wmsg.write);
if (nxt_slow_path(res != NXT_OK)) {
- // 500 Failed to send message
- nxt_log_alert(task->log, "failed to send message to application");
+ nxt_router_gen_error(task, rc->conn, 500,
+ "Failed to send message to application");
+ goto fail;
}
+fail:
port->mem_pool = port_mp;
}
@@ -1934,6 +2260,12 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
+ if (rc->app_port != NULL) {
+ nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
+
+ rc->app_port = NULL;
+ }
+
nxt_event_engine_request_remove(task->thread->engine, rc);
} nxt_queue_loop;
@@ -1944,9 +2276,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
task = &task->thread->engine->task;
- nxt_mp_release(c->mem_pool, c);
-
- nxt_router_conf_release(task, joint);
+ if (nxt_mp_release(c->mem_pool, c) == 0) {
+ nxt_router_conf_release(task, joint);
+ }
}
@@ -1992,3 +2324,71 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
}
+
+
+static nxt_int_t
+nxt_sw_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ return NXT_OK;
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_sw_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_sw_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+static void
+nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
+ nxt_start_worker_t *sw)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&sw->stream, sizeof(sw->stream));
+ lhq.key.length = sizeof(sw->stream);
+ lhq.key.start = (u_char *) &sw->stream;
+ lhq.proto = &lvlhsh_sw_proto;
+ lhq.replace = 0;
+ lhq.value = sw;
+ lhq.pool = task->thread->runtime->mem_pool;
+
+ switch (nxt_lvlhsh_insert(&router->start_workers, &lhq)) {
+
+ case NXT_OK:
+ break;
+
+ default:
+ nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw add failed",
+ sw->stream);
+ break;
+ }
+}
+
+
+static nxt_start_worker_t *
+nxt_router_sw_find_remove(nxt_task_t *task, nxt_router_t *router, uint32_t id)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&id, sizeof(id));
+ lhq.key.length = sizeof(id);
+ lhq.key.start = (u_char *) &id;
+ lhq.proto = &lvlhsh_sw_proto;
+ lhq.pool = task->thread->runtime->mem_pool;
+
+ switch (nxt_lvlhsh_delete(&router->start_workers, &lhq)) {
+
+ case NXT_OK:
+ return lhq.value;
+
+ default:
+ nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw remove failed",
+ id);
+ break;
+ }
+
+ return NULL;
+}