diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-06-23 19:20:08 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-06-23 19:20:08 +0300 |
commit | b8f126dcdfdf04bb01b70f9590fc64b3e155e119 (patch) | |
tree | 49fc84fb72e1483103c639e5c394820d8127223f /src/nxt_router.c | |
parent | 4a1b59c27a8e85fc3b03c420fbc1642ce52e96cf (diff) | |
download | unit-b8f126dcdfdf04bb01b70f9590fc64b3e155e119.tar.gz unit-b8f126dcdfdf04bb01b70f9590fc64b3e155e119.tar.bz2 |
Added basic HTTP request processing in router.
- request to connection mapping in engine;
- requests queue in connection;
- engine port creation;
- connected ports hash for each process;
- engine port data messages processing (app responses);
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 378 |
1 files changed, 349 insertions, 29 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 75edba40..d55862f7 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -6,6 +6,7 @@ */ #include <nxt_router.h> +#include <nxt_application.h> static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, @@ -64,6 +65,9 @@ static void nxt_router_conf_release(nxt_task_t *task, static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); +static void nxt_router_process_http_request(nxt_task_t *task, + nxt_conn_t *c, nxt_app_parse_ctx_t *ap); +static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); @@ -79,6 +83,11 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_router_temp_conf_t *tmcf; const nxt_event_interface_t *interface; + ret = nxt_app_http_init(task, rt); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + router = nxt_zalloc(sizeof(nxt_router_t)); if (nxt_slow_path(router == NULL)) { return NXT_ERROR; @@ -519,6 +528,7 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, joint->count = 1; joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + joint->engine = recf->engine; } return NXT_OK; @@ -578,7 +588,10 @@ static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine) { + nxt_mp_t *mp; nxt_int_t ret; + nxt_port_t *port; + nxt_process_t *process; nxt_thread_link_t *link; nxt_thread_handle_t handle; @@ -596,6 +609,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_queue_insert_tail(&rt->engines, &engine->link); + + process = nxt_runtime_process_find(rt, nxt_pid); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + port = nxt_process_port_new(process); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + return NXT_ERROR; + } + + port->mem_pool = mp; + port->engine = 0; + port->type = NXT_PROCESS_ROUTER; + + engine->port = port; + + nxt_runtime_port_add(rt, port); + + ret = nxt_thread_create(&handle, link); if (nxt_slow_path(ret != NXT_OK)) { @@ -637,14 +680,28 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf) static void +nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); + +nxt_port_handler_t nxt_router_process_port_handlers[] = { + NULL, + nxt_port_new_port_handler, + nxt_port_change_log_file_handler, + nxt_port_mmap_handler, + nxt_router_data_handler, +}; + + +static void nxt_router_thread_start(void *data) { + nxt_task_t *task; nxt_thread_t *thread; nxt_thread_link_t *link; nxt_event_engine_t *engine; link = data; engine = link->engine; + task = &engine->task; thread = nxt_thread(); @@ -657,6 +714,9 @@ nxt_router_thread_start(void *data) thread->task = &engine->task; thread->fiber = &engine->fibers->fiber; + engine->port->socket.task = task; + nxt_port_create(task, engine->port, nxt_router_process_port_handlers); + engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); nxt_event_engine_start(engine); @@ -913,51 +973,159 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) static const nxt_conn_state_t nxt_router_conn_write_state nxt_aligned(64) = { - .ready_handler = nxt_router_conn_close, + .ready_handler = nxt_router_conn_ready, .close_handler = nxt_router_conn_close, .error_handler = nxt_router_conn_error, }; static void +nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + size_t dump_size; + nxt_buf_t *b, *i, *last; + nxt_conn_t *c; + nxt_work_queue_t *wq; + nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; + + b = msg->buf; + engine = task->thread->engine; + wq = &engine->fast_work_queue; + + rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); + if (nxt_slow_path(rc == NULL)) { + + nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); + + /* complete buffer(s) */ + for (i = b; i != NULL; i = i->next) { + i->mem.pos = i->mem.free; + + nxt_work_queue_add(wq, i->completion_handler, task, i, i->parent); + } + + return; + } + + c = rc->conn; + + dump_size = nxt_buf_used_size(b); + + if (dump_size > 300) { + dump_size = 300; + } + + nxt_debug(task, "%srouter data (%z): %*s", + msg->port_msg.last ? "last " : "", msg->size, dump_size, + b->mem.pos); + + if (msg->size == 0) { + b = NULL; + } + + if (msg->port_msg.last != 0) { + nxt_debug(task, "router data create last buf"); + + last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(last == NULL)) { + /* TODO pogorevaTb */ + } + + nxt_buf_chain_add(&b, last); + } + + if (b == NULL) { + return; + } + + if (c->write == NULL) { + c->write = b; + c->write_state = &nxt_router_conn_write_state; + + nxt_conn_write(task->thread->engine, c); + } else { + nxt_debug(task, "router data attach out bufs to existing chain"); + + nxt_buf_chain_add(&c->write, b); + } +} + + +nxt_inline nxt_port_t * +nxt_router_app_port(nxt_task_t *task) +{ + nxt_port_t *port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + nxt_runtime_port_each(rt, port) { + + if (nxt_pid == port->pid) { + continue; + } + + if (port->type == NXT_PROCESS_WORKER) { + return port; + } + + } nxt_runtime_port_loop; + + return NULL; +} + + +static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) { - size_t size; + size_t size, preread; nxt_int_t ret; nxt_buf_t *b; nxt_conn_t *c; + nxt_app_parse_ctx_t *ap; nxt_socket_conf_joint_t *joint; - nxt_http_request_parse_t *rp; + nxt_app_request_header_t *h; c = obj; - rp = data; + ap = data; + b = c->read; nxt_debug(task, "router conn http header parse"); - if (rp == NULL) { - rp = nxt_mp_zget(c->mem_pool, sizeof(nxt_http_request_parse_t)); - if (nxt_slow_path(rp == NULL)) { + if (ap == NULL) { + ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); + if (nxt_slow_path(ap == NULL)) { nxt_router_conn_close(task, c, data); return; } - c->socket.data = rp; - - ret = nxt_http_parse_request_init(rp, c->mem_pool); + ret = nxt_app_http_req_init(task, ap); if (nxt_slow_path(ret != NXT_OK)) { nxt_router_conn_close(task, c, data); return; } + + c->socket.data = ap; } - ret = nxt_http_parse_request(rp, &c->read->mem); + h = &ap->r.header; + + ret = nxt_app_http_req_parse(task, ap, b); nxt_debug(task, "http parse request: %d", ret); switch (nxt_expect(NXT_DONE, ret)) { case NXT_DONE: - break; + preread = nxt_buf_mem_used_size(&b->mem); + + nxt_debug(task, "router request header parsing complete, " + "content length: %O, preread: %uz", + h->parsed_content_length, preread); + + nxt_router_process_http_request(task, c, ap); + return; case NXT_ERROR: nxt_router_conn_close(task, c, data); @@ -965,32 +1133,122 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) default: /* NXT_AGAIN */ - if (c->read->mem.free == c->read->mem.end) { - joint = c->listen->socket.data; - size = joint->socket_conf->large_header_buffer_size, + if (h->done == 0) { + + if (c->read->mem.free == c->read->mem.end) { + joint = c->listen->socket.data; + size = joint->socket_conf->large_header_buffer_size; + + if (size > (size_t) nxt_buf_mem_size(&b->mem)) { + b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + nxt_router_conn_close(task, c, data); + return; + } + + size = c->read->mem.free - c->read->mem.pos; + nxt_memcpy(b->mem.pos, c->read->mem.pos, size); - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - nxt_router_conn_close(task, c, data); - return; + b->mem.free += size; + c->read = b; + } else { + // TODO 500 Too long request headers + nxt_log_alert(task->log, "Too long request headers"); + } } + } + + if (ap->r.body.done == 0) { + + preread = nxt_buf_mem_used_size(&b->mem); + + if (h->parsed_content_length - preread > + (size_t) nxt_buf_mem_free_size(&b->mem)) { + + b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); + if (nxt_slow_path(b == NULL)) { + // TODO 500 Failed to allocate buffer for request body + nxt_log_alert(task->log, "Failed to allocate buffer for " + "request body"); + } - size = c->read->mem.free - c->read->mem.pos; - nxt_memcpy(b->mem.pos, c->read->mem.pos, size); + b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, + preread); + + c->read = b; + } + + nxt_debug(task, "router request body read again, rest: %uz", + h->parsed_content_length - preread); - b->mem.free += size; - c->read = b; } - nxt_conn_read(task->thread->engine, c); - return; } - c->write = c->read; - c->write->mem.pos = c->write->mem.start; - c->write_state = &nxt_router_conn_write_state; + nxt_conn_read(task->thread->engine, c); +} + + +static void +nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, + nxt_app_parse_ctx_t *ap) +{ + nxt_port_t *port, *c_port; + nxt_req_id_t req_id; + nxt_app_wmsg_t wmsg; + nxt_event_engine_t *engine; + nxt_req_conn_link_t *rc; + + if (nxt_slow_path(nxt_app == NULL)) { + // 500 Application not found + nxt_log_alert(task->log, "application is NULL"); + } + + port = nxt_router_app_port(task); + + if (nxt_slow_path(port == NULL)) { + // 500 Application port not found + nxt_log_alert(task->log, "application port not found"); + } + + engine = task->thread->engine; + + do { + req_id = nxt_random(&nxt_random_data); + } while (nxt_event_engine_request_find(engine, req_id) != NULL); + + rc = nxt_conn_request_add(c, req_id); + if (nxt_slow_path(rc == NULL)) { + // 500 Failed to allocate req->conn link + nxt_log_alert(task->log, "failed to allocate req->conn link"); + } + + nxt_event_engine_request_add(engine, rc); + + nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", + req_id, c, engine); - nxt_conn_write(task->thread->engine, c); + c_port = nxt_process_connected_port_find(port->process, + engine->port->pid, + engine->port->id); + if (nxt_slow_path(c_port != engine->port)) { + (void) nxt_port_send_port(task, port, engine->port); + nxt_process_connected_port_add(port->process, engine->port); + } + + wmsg.port = port; + wmsg.write = NULL; + wmsg.buf = &wmsg.write; + wmsg.stream = req_id; + + (void)nxt_app->prepare_msg(task, &ap->r, &wmsg); + + nxt_debug(task, "about to send %d bytes buffer to worker port %d", + nxt_buf_used_size(wmsg.write), + wmsg.port->socket.fd); + + (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, + -1, req_id, engine->port->id, wmsg.write); } @@ -1002,6 +1260,59 @@ static const nxt_conn_state_t nxt_router_conn_close_state static void +nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_bool_t last; + nxt_conn_t *c; + nxt_work_queue_t *wq; + + nxt_debug(task, "router conn ready %p", obj); + + c = obj; + b = c->write; + + wq = &task->thread->engine->fast_work_queue; + + last = 0; + + while (b != NULL) { + if (!nxt_buf_is_sync(b)) { + if (nxt_buf_used_size(b) > 0) { + break; + } + } + + if (nxt_buf_is_last(b)) { + last = 1; + } + + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); + + b = b->next; + } + + c->write = b; + + if (b != NULL) { + nxt_debug(task, "router conn %p has more data to write", obj); + + nxt_conn_write(task->thread->engine, c); + } else { + nxt_debug(task, "router conn %p no more data to write, last = %d", obj, + last); + + if (last != 0) { + nxt_debug(task, "enqueue router conn close %p (ready handler)", c); + + nxt_work_queue_add(wq, nxt_router_conn_close, task, c, + c->socket.data); + } + } +} + + +static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; @@ -1020,6 +1331,7 @@ static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; + nxt_req_conn_link_t *rc; nxt_socket_conf_joint_t *joint; c = obj; @@ -1029,6 +1341,14 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) joint = c->listen->socket.data; nxt_router_conf_release(task, joint); + nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { + + nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); + + nxt_event_engine_request_remove(task->thread->engine, rc); + + } nxt_queue_loop; + nxt_mp_destroy(c->mem_pool); } |