summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-06-23 19:20:08 +0300
committerMax Romanov <max.romanov@nginx.com>2017-06-23 19:20:08 +0300
commitb8f126dcdfdf04bb01b70f9590fc64b3e155e119 (patch)
tree49fc84fb72e1483103c639e5c394820d8127223f /src/nxt_router.c
parent4a1b59c27a8e85fc3b03c420fbc1642ce52e96cf (diff)
downloadunit-b8f126dcdfdf04bb01b70f9590fc64b3e155e119.tar.gz
unit-b8f126dcdfdf04bb01b70f9590fc64b3e155e119.tar.bz2
Added basic HTTP request processing in router.
- request to connection mapping in engine; - requests queue in connection; - engine port creation; - connected ports hash for each process; - engine port data messages processing (app responses);
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c378
1 files changed, 349 insertions, 29 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 75edba40..d55862f7 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -6,6 +6,7 @@
*/
#include <nxt_router.h>
+#include <nxt_application.h>
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
@@ -64,6 +65,9 @@ static void nxt_router_conf_release(nxt_task_t *task,
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_process_http_request(nxt_task_t *task,
+ nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
+static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
@@ -79,6 +83,11 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_router_temp_conf_t *tmcf;
const nxt_event_interface_t *interface;
+ ret = nxt_app_http_init(task, rt);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
router = nxt_zalloc(sizeof(nxt_router_t));
if (nxt_slow_path(router == NULL)) {
return NXT_ERROR;
@@ -519,6 +528,7 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
joint->count = 1;
joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
+ joint->engine = recf->engine;
}
return NXT_OK;
@@ -578,7 +588,10 @@ static nxt_int_t
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine)
{
+ nxt_mp_t *mp;
nxt_int_t ret;
+ nxt_port_t *port;
+ nxt_process_t *process;
nxt_thread_link_t *link;
nxt_thread_handle_t handle;
@@ -596,6 +609,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_queue_insert_tail(&rt->engines, &engine->link);
+
+ process = nxt_runtime_process_find(rt, nxt_pid);
+ if (nxt_slow_path(process == NULL)) {
+ return NXT_ERROR;
+ }
+
+ port = nxt_process_port_new(process);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+ if (nxt_slow_path(mp == NULL)) {
+ return NXT_ERROR;
+ }
+
+ port->mem_pool = mp;
+ port->engine = 0;
+ port->type = NXT_PROCESS_ROUTER;
+
+ engine->port = port;
+
+ nxt_runtime_port_add(rt, port);
+
+
ret = nxt_thread_create(&handle, link);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -637,14 +680,28 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf)
static void
+nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+
+nxt_port_handler_t nxt_router_process_port_handlers[] = {
+ NULL,
+ nxt_port_new_port_handler,
+ nxt_port_change_log_file_handler,
+ nxt_port_mmap_handler,
+ nxt_router_data_handler,
+};
+
+
+static void
nxt_router_thread_start(void *data)
{
+ nxt_task_t *task;
nxt_thread_t *thread;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
link = data;
engine = link->engine;
+ task = &engine->task;
thread = nxt_thread();
@@ -657,6 +714,9 @@ nxt_router_thread_start(void *data)
thread->task = &engine->task;
thread->fiber = &engine->fibers->fiber;
+ engine->port->socket.task = task;
+ nxt_port_create(task, engine->port, nxt_router_process_port_handlers);
+
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
nxt_event_engine_start(engine);
@@ -913,51 +973,159 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
static const nxt_conn_state_t nxt_router_conn_write_state
nxt_aligned(64) =
{
- .ready_handler = nxt_router_conn_close,
+ .ready_handler = nxt_router_conn_ready,
.close_handler = nxt_router_conn_close,
.error_handler = nxt_router_conn_error,
};
static void
+nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ size_t dump_size;
+ nxt_buf_t *b, *i, *last;
+ nxt_conn_t *c;
+ nxt_work_queue_t *wq;
+ nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
+
+ b = msg->buf;
+ engine = task->thread->engine;
+ wq = &engine->fast_work_queue;
+
+ rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
+ if (nxt_slow_path(rc == NULL)) {
+
+ nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
+
+ /* complete buffer(s) */
+ for (i = b; i != NULL; i = i->next) {
+ i->mem.pos = i->mem.free;
+
+ nxt_work_queue_add(wq, i->completion_handler, task, i, i->parent);
+ }
+
+ return;
+ }
+
+ c = rc->conn;
+
+ dump_size = nxt_buf_used_size(b);
+
+ if (dump_size > 300) {
+ dump_size = 300;
+ }
+
+ nxt_debug(task, "%srouter data (%z): %*s",
+ msg->port_msg.last ? "last " : "", msg->size, dump_size,
+ b->mem.pos);
+
+ if (msg->size == 0) {
+ b = NULL;
+ }
+
+ if (msg->port_msg.last != 0) {
+ nxt_debug(task, "router data create last buf");
+
+ last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
+ if (nxt_slow_path(last == NULL)) {
+ /* TODO pogorevaTb */
+ }
+
+ nxt_buf_chain_add(&b, last);
+ }
+
+ if (b == NULL) {
+ return;
+ }
+
+ if (c->write == NULL) {
+ c->write = b;
+ c->write_state = &nxt_router_conn_write_state;
+
+ nxt_conn_write(task->thread->engine, c);
+ } else {
+ nxt_debug(task, "router data attach out bufs to existing chain");
+
+ nxt_buf_chain_add(&c->write, b);
+ }
+}
+
+
+nxt_inline nxt_port_t *
+nxt_router_app_port(nxt_task_t *task)
+{
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ nxt_runtime_port_each(rt, port) {
+
+ if (nxt_pid == port->pid) {
+ continue;
+ }
+
+ if (port->type == NXT_PROCESS_WORKER) {
+ return port;
+ }
+
+ } nxt_runtime_port_loop;
+
+ return NULL;
+}
+
+
+static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
- size_t size;
+ size_t size, preread;
nxt_int_t ret;
nxt_buf_t *b;
nxt_conn_t *c;
+ nxt_app_parse_ctx_t *ap;
nxt_socket_conf_joint_t *joint;
- nxt_http_request_parse_t *rp;
+ nxt_app_request_header_t *h;
c = obj;
- rp = data;
+ ap = data;
+ b = c->read;
nxt_debug(task, "router conn http header parse");
- if (rp == NULL) {
- rp = nxt_mp_zget(c->mem_pool, sizeof(nxt_http_request_parse_t));
- if (nxt_slow_path(rp == NULL)) {
+ if (ap == NULL) {
+ ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
+ if (nxt_slow_path(ap == NULL)) {
nxt_router_conn_close(task, c, data);
return;
}
- c->socket.data = rp;
-
- ret = nxt_http_parse_request_init(rp, c->mem_pool);
+ ret = nxt_app_http_req_init(task, ap);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_router_conn_close(task, c, data);
return;
}
+
+ c->socket.data = ap;
}
- ret = nxt_http_parse_request(rp, &c->read->mem);
+ h = &ap->r.header;
+
+ ret = nxt_app_http_req_parse(task, ap, b);
nxt_debug(task, "http parse request: %d", ret);
switch (nxt_expect(NXT_DONE, ret)) {
case NXT_DONE:
- break;
+ preread = nxt_buf_mem_used_size(&b->mem);
+
+ nxt_debug(task, "router request header parsing complete, "
+ "content length: %O, preread: %uz",
+ h->parsed_content_length, preread);
+
+ nxt_router_process_http_request(task, c, ap);
+ return;
case NXT_ERROR:
nxt_router_conn_close(task, c, data);
@@ -965,32 +1133,122 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
default: /* NXT_AGAIN */
- if (c->read->mem.free == c->read->mem.end) {
- joint = c->listen->socket.data;
- size = joint->socket_conf->large_header_buffer_size,
+ if (h->done == 0) {
+
+ if (c->read->mem.free == c->read->mem.end) {
+ joint = c->listen->socket.data;
+ size = joint->socket_conf->large_header_buffer_size;
+
+ if (size > (size_t) nxt_buf_mem_size(&b->mem)) {
+ b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ nxt_router_conn_close(task, c, data);
+ return;
+ }
+
+ size = c->read->mem.free - c->read->mem.pos;
+ nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_conn_close(task, c, data);
- return;
+ b->mem.free += size;
+ c->read = b;
+ } else {
+ // TODO 500 Too long request headers
+ nxt_log_alert(task->log, "Too long request headers");
+ }
}
+ }
+
+ if (ap->r.body.done == 0) {
+
+ preread = nxt_buf_mem_used_size(&b->mem);
+
+ if (h->parsed_content_length - preread >
+ (size_t) nxt_buf_mem_free_size(&b->mem)) {
+
+ b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
+ if (nxt_slow_path(b == NULL)) {
+ // TODO 500 Failed to allocate buffer for request body
+ nxt_log_alert(task->log, "Failed to allocate buffer for "
+ "request body");
+ }
- size = c->read->mem.free - c->read->mem.pos;
- nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
+ b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos,
+ preread);
+
+ c->read = b;
+ }
+
+ nxt_debug(task, "router request body read again, rest: %uz",
+ h->parsed_content_length - preread);
- b->mem.free += size;
- c->read = b;
}
- nxt_conn_read(task->thread->engine, c);
- return;
}
- c->write = c->read;
- c->write->mem.pos = c->write->mem.start;
- c->write_state = &nxt_router_conn_write_state;
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static void
+nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
+ nxt_app_parse_ctx_t *ap)
+{
+ nxt_port_t *port, *c_port;
+ nxt_req_id_t req_id;
+ nxt_app_wmsg_t wmsg;
+ nxt_event_engine_t *engine;
+ nxt_req_conn_link_t *rc;
+
+ if (nxt_slow_path(nxt_app == NULL)) {
+ // 500 Application not found
+ nxt_log_alert(task->log, "application is NULL");
+ }
+
+ port = nxt_router_app_port(task);
+
+ if (nxt_slow_path(port == NULL)) {
+ // 500 Application port not found
+ nxt_log_alert(task->log, "application port not found");
+ }
+
+ engine = task->thread->engine;
+
+ do {
+ req_id = nxt_random(&nxt_random_data);
+ } while (nxt_event_engine_request_find(engine, req_id) != NULL);
+
+ rc = nxt_conn_request_add(c, req_id);
+ if (nxt_slow_path(rc == NULL)) {
+ // 500 Failed to allocate req->conn link
+ nxt_log_alert(task->log, "failed to allocate req->conn link");
+ }
+
+ nxt_event_engine_request_add(engine, rc);
+
+ nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
+ req_id, c, engine);
- nxt_conn_write(task->thread->engine, c);
+ c_port = nxt_process_connected_port_find(port->process,
+ engine->port->pid,
+ engine->port->id);
+ if (nxt_slow_path(c_port != engine->port)) {
+ (void) nxt_port_send_port(task, port, engine->port);
+ nxt_process_connected_port_add(port->process, engine->port);
+ }
+
+ wmsg.port = port;
+ wmsg.write = NULL;
+ wmsg.buf = &wmsg.write;
+ wmsg.stream = req_id;
+
+ (void)nxt_app->prepare_msg(task, &ap->r, &wmsg);
+
+ nxt_debug(task, "about to send %d bytes buffer to worker port %d",
+ nxt_buf_used_size(wmsg.write),
+ wmsg.port->socket.fd);
+
+ (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
+ -1, req_id, engine->port->id, wmsg.write);
}
@@ -1002,6 +1260,59 @@ static const nxt_conn_state_t nxt_router_conn_close_state
static void
+nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_bool_t last;
+ nxt_conn_t *c;
+ nxt_work_queue_t *wq;
+
+ nxt_debug(task, "router conn ready %p", obj);
+
+ c = obj;
+ b = c->write;
+
+ wq = &task->thread->engine->fast_work_queue;
+
+ last = 0;
+
+ while (b != NULL) {
+ if (!nxt_buf_is_sync(b)) {
+ if (nxt_buf_used_size(b) > 0) {
+ break;
+ }
+ }
+
+ if (nxt_buf_is_last(b)) {
+ last = 1;
+ }
+
+ nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
+
+ b = b->next;
+ }
+
+ c->write = b;
+
+ if (b != NULL) {
+ nxt_debug(task, "router conn %p has more data to write", obj);
+
+ nxt_conn_write(task->thread->engine, c);
+ } else {
+ nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
+ last);
+
+ if (last != 0) {
+ nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
+
+ nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
+ c->socket.data);
+ }
+ }
+}
+
+
+static void
nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
@@ -1020,6 +1331,7 @@ static void
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
+ nxt_req_conn_link_t *rc;
nxt_socket_conf_joint_t *joint;
c = obj;
@@ -1029,6 +1341,14 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
joint = c->listen->socket.data;
nxt_router_conf_release(task, joint);
+ nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
+
+ nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
+
+ nxt_event_engine_request_remove(task->thread->engine, rc);
+
+ } nxt_queue_loop;
+
nxt_mp_destroy(c->mem_pool);
}