diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-06-23 19:20:08 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-06-23 19:20:08 +0300 |
commit | b8f126dcdfdf04bb01b70f9590fc64b3e155e119 (patch) | |
tree | 49fc84fb72e1483103c639e5c394820d8127223f | |
parent | 4a1b59c27a8e85fc3b03c420fbc1642ce52e96cf (diff) | |
download | unit-b8f126dcdfdf04bb01b70f9590fc64b3e155e119.tar.gz unit-b8f126dcdfdf04bb01b70f9590fc64b3e155e119.tar.bz2 |
Added basic HTTP request processing in router.
- request to connection mapping in engine;
- requests queue in connection;
- engine port creation;
- connected ports hash for each process;
- engine port data messages processing (app responses);
Diffstat (limited to '')
-rw-r--r-- | src/nxt_conn.c | 34 | ||||
-rw-r--r-- | src/nxt_conn.h | 18 | ||||
-rw-r--r-- | src/nxt_event_engine.c | 112 | ||||
-rw-r--r-- | src/nxt_event_engine.h | 10 | ||||
-rw-r--r-- | src/nxt_main.h | 4 | ||||
-rw-r--r-- | src/nxt_port.c | 2 | ||||
-rw-r--r-- | src/nxt_process.c | 26 | ||||
-rw-r--r-- | src/nxt_process.h | 12 | ||||
-rw-r--r-- | src/nxt_router.c | 378 |
9 files changed, 563 insertions, 33 deletions
diff --git a/src/nxt_conn.c b/src/nxt_conn.c index bba2bfd1..0cc052e8 100644 --- a/src/nxt_conn.c +++ b/src/nxt_conn.c @@ -85,6 +85,8 @@ nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task) nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); + nxt_queue_init(&c->requests); + nxt_log_debug(&c->log, "connections: %uD", thr->engine->connections); return c; @@ -150,3 +152,35 @@ nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq) c->read_timer.work_queue = wq; c->write_timer.work_queue = wq; } + + +nxt_req_conn_link_t * +nxt_conn_request_add(nxt_conn_t *c, nxt_req_id_t req_id) +{ + nxt_req_conn_link_t *rc; + + rc = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_req_conn_link_t)); + if (nxt_slow_path(rc == NULL)) { + nxt_thread_log_error(NXT_LOG_WARN, "failed to allocate req %08uxD " + "to conn", req_id); + return NULL; + } + + rc->req_id = req_id; + rc->conn = c; + + nxt_queue_insert_tail(&c->requests, &rc->link); + + return rc; +} + + +void +nxt_conn_request_remove(nxt_conn_t *c, nxt_req_conn_link_t *rc) +{ + nxt_queue_remove(&rc->link); + + nxt_mp_free(c->mem_pool, rc); +} + + diff --git a/src/nxt_conn.h b/src/nxt_conn.h index 479450bb..6c4de090 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -141,6 +141,8 @@ struct nxt_conn_s { nxt_conn_io_t *io; + nxt_queue_t requests; /* of nxt_req_conn_link_t */ + #if (NXT_SSLTLS || NXT_THREADS) /* SunC does not support "zero-sized struct/union". */ @@ -180,6 +182,16 @@ struct nxt_conn_s { }; +typedef uint32_t nxt_req_id_t; + +typedef struct { + nxt_req_id_t req_id; + nxt_conn_t *conn; + + nxt_queue_link_t link; +} nxt_req_conn_link_t; + + #define nxt_conn_timer_init(ev, c, wq) \ do { \ (ev)->work_queue = (wq); \ @@ -347,4 +359,10 @@ NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p); #define nxt_event_conn_close nxt_conn_close +NXT_EXPORT nxt_req_conn_link_t *nxt_conn_request_add(nxt_conn_t *c, + nxt_req_id_t req_id); +NXT_EXPORT void nxt_conn_request_remove(nxt_conn_t *c, + nxt_req_conn_link_t *rc); + + #endif /* _NXT_CONN_H_INCLUDED_ */ diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index d636dc25..9171bcd6 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -544,3 +544,115 @@ nxt_event_engine_start(nxt_event_engine_t *engine) nxt_timer_expire(engine, now); } } + + +static nxt_int_t +nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + return NXT_OK; +} + +static const nxt_lvlhsh_proto_t lvlhsh_req_conn_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_req_conn_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +void +nxt_event_engine_request_add(nxt_event_engine_t *engine, + nxt_req_conn_link_t *rc) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id)); + lhq.key.length = sizeof(rc->req_id); + lhq.key.start = (u_char *) &rc->req_id; + lhq.proto = &lvlhsh_req_conn_proto; + lhq.replace = 0; + lhq.value = rc; + lhq.pool = engine->mem_pool; + + switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) { + + case NXT_OK: + break; + + default: + nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed", + rc->req_id); + break; + } +} + + +nxt_req_conn_link_t * +nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id)); + lhq.key.length = sizeof(req_id); + lhq.key.start = (u_char *) &req_id; + lhq.proto = &lvlhsh_req_conn_proto; + + if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) { + return lhq.value; + } + + return NULL; +} + + +void +nxt_event_engine_request_remove(nxt_event_engine_t *engine, + nxt_req_conn_link_t *rc) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id)); + lhq.key.length = sizeof(rc->req_id); + lhq.key.start = (u_char *) &rc->req_id; + lhq.proto = &lvlhsh_req_conn_proto; + lhq.pool = engine->mem_pool; + + switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) { + + case NXT_OK: + break; + + default: + nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed", + rc->req_id); + break; + } +} + + +nxt_req_conn_link_t * +nxt_event_engine_request_find_remove(nxt_event_engine_t *engine, + nxt_req_id_t req_id) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id)); + lhq.key.length = sizeof(req_id); + lhq.key.start = (u_char *) &req_id; + lhq.proto = &lvlhsh_req_conn_proto; + lhq.pool = engine->mem_pool; + + switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) { + + case NXT_OK: + return lhq.value; + + default: + nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed", + req_id); + break; + } + + return NULL; +} + diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index d58949ba..ceb1654e 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -491,6 +491,7 @@ struct nxt_event_engine_s { nxt_queue_t joints; nxt_queue_t listen_connections; nxt_queue_t idle_connections; + nxt_lvlhsh_t requests; /* req_id to nxt_req_conn_link_t */ nxt_queue_link_t link; }; @@ -509,6 +510,15 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine, NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo); +NXT_EXPORT void nxt_event_engine_request_add(nxt_event_engine_t *engine, + nxt_req_conn_link_t *rc); +NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find( + nxt_event_engine_t *engine, nxt_req_id_t req_id); +NXT_EXPORT void nxt_event_engine_request_remove(nxt_event_engine_t *engine, + nxt_req_conn_link_t *rc); +NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find_remove( + nxt_event_engine_t *engine, nxt_req_id_t req_id); + nxt_inline nxt_event_engine_t * nxt_thread_event_engine(void) diff --git a/src/nxt_main.h b/src/nxt_main.h index 0aa2192c..4b9a9036 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -26,7 +26,6 @@ typedef struct nxt_runtime_s nxt_runtime_t; typedef uint16_t nxt_port_id_t; #include <nxt_queue.h> -#include <nxt_process.h> typedef struct nxt_thread_s nxt_thread_t; #include <nxt_thread_id.h> @@ -36,6 +35,8 @@ typedef struct nxt_thread_s nxt_thread_t; #include <nxt_random.h> #include <nxt_string.h> +#include <nxt_lvlhsh.h> +#include <nxt_process.h> #include <nxt_utf8.h> #include <nxt_file_name.h> @@ -90,7 +91,6 @@ typedef struct nxt_thread_pool_s nxt_thread_pool_t; #include <nxt_djb_hash.h> #include <nxt_murmur_hash.h> -#include <nxt_lvlhsh.h> #include <nxt_hash.h> #include <nxt_sort.h> diff --git a/src/nxt_port.c b/src/nxt_port.c index eb320640..91f0fe31 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -142,8 +142,6 @@ nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) b = obj; mp = b->data; - /* TODO: b->mem.pos */ - nxt_buf_free(mp, b); } diff --git a/src/nxt_process.c b/src/nxt_process.c index ce16cf7f..33af310d 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -540,3 +540,29 @@ nxt_process_port_new(nxt_process_t *process) return port; } + +void +nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) +{ + /* TODO lock ports */ + + nxt_port_hash_add(&process->connected_ports, process->mem_pool, port); +} + +void +nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) +{ + /* TODO lock ports */ + + nxt_port_hash_remove(&process->connected_ports, process->mem_pool, port); +} + +nxt_port_t * +nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid, + nxt_port_id_t port_id) +{ + /* TODO lock ports */ + + return nxt_port_hash_find(&process->connected_ports, pid, port_id); +} + diff --git a/src/nxt_process.h b/src/nxt_process.h index 669c8f03..32a8d8dc 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -58,6 +58,8 @@ typedef struct { nxt_process_init_t *init; nxt_array_t *incoming; /* of nxt_mmap_t */ nxt_array_t *outgoing; /* of nxt_mmap_t */ + + nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ } nxt_process_t; @@ -88,6 +90,16 @@ NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_process_t *process); #define nxt_process_port_loop \ nxt_queue_loop + +void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); + +void nxt_process_connected_port_remove(nxt_process_t *process, + nxt_port_t *port); + +nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, + nxt_pid_t pid, nxt_port_id_t port_id); + + #if (NXT_HAVE_SETPROCTITLE) #define nxt_process_title(task, fmt, ...) \ diff --git a/src/nxt_router.c b/src/nxt_router.c index 75edba40..d55862f7 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -6,6 +6,7 @@ */ #include <nxt_router.h> +#include <nxt_application.h> static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, @@ -64,6 +65,9 @@ static void nxt_router_conf_release(nxt_task_t *task, static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); +static void nxt_router_process_http_request(nxt_task_t *task, + nxt_conn_t *c, nxt_app_parse_ctx_t *ap); +static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); @@ -79,6 +83,11 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_router_temp_conf_t *tmcf; const nxt_event_interface_t *interface; + ret = nxt_app_http_init(task, rt); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + router = nxt_zalloc(sizeof(nxt_router_t)); if (nxt_slow_path(router == NULL)) { return NXT_ERROR; @@ -519,6 +528,7 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, joint->count = 1; joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + joint->engine = recf->engine; } return NXT_OK; @@ -578,7 +588,10 @@ static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine) { + nxt_mp_t *mp; nxt_int_t ret; + nxt_port_t *port; + nxt_process_t *process; nxt_thread_link_t *link; nxt_thread_handle_t handle; @@ -596,6 +609,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_queue_insert_tail(&rt->engines, &engine->link); + + process = nxt_runtime_process_find(rt, nxt_pid); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + port = nxt_process_port_new(process); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + return NXT_ERROR; + } + + port->mem_pool = mp; + port->engine = 0; + port->type = NXT_PROCESS_ROUTER; + + engine->port = port; + + nxt_runtime_port_add(rt, port); + + ret = nxt_thread_create(&handle, link); if (nxt_slow_path(ret != NXT_OK)) { @@ -637,14 +680,28 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf) static void +nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); + +nxt_port_handler_t nxt_router_process_port_handlers[] = { + NULL, + nxt_port_new_port_handler, + nxt_port_change_log_file_handler, + nxt_port_mmap_handler, + nxt_router_data_handler, +}; + + +static void nxt_router_thread_start(void *data) { + nxt_task_t *task; nxt_thread_t *thread; nxt_thread_link_t *link; nxt_event_engine_t *engine; link = data; engine = link->engine; + task = &engine->task; thread = nxt_thread(); @@ -657,6 +714,9 @@ nxt_router_thread_start(void *data) thread->task = &engine->task; thread->fiber = &engine->fibers->fiber; + engine->port->socket.task = task; + nxt_port_create(task, engine->port, nxt_router_process_port_handlers); + engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); nxt_event_engine_start(engine); @@ -913,51 +973,159 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) static const nxt_conn_state_t nxt_router_conn_write_state nxt_aligned(64) = { - .ready_handler = nxt_router_conn_close, + .ready_handler = nxt_router_conn_ready, .close_handler = nxt_router_conn_close, .error_handler = nxt_router_conn_error, }; static void +nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + size_t dump_size; + nxt_buf_t *b, *i, *last; + nxt_conn_t *c; + nxt_work_queue_t *wq; + nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; + + b = msg->buf; + engine = task->thread->engine; + wq = &engine->fast_work_queue; + + rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); + if (nxt_slow_path(rc == NULL)) { + + nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); + + /* complete buffer(s) */ + for (i = b; i != NULL; i = i->next) { + i->mem.pos = i->mem.free; + + nxt_work_queue_add(wq, i->completion_handler, task, i, i->parent); + } + + return; + } + + c = rc->conn; + + dump_size = nxt_buf_used_size(b); + + if (dump_size > 300) { + dump_size = 300; + } + + nxt_debug(task, "%srouter data (%z): %*s", + msg->port_msg.last ? "last " : "", msg->size, dump_size, + b->mem.pos); + + if (msg->size == 0) { + b = NULL; + } + + if (msg->port_msg.last != 0) { + nxt_debug(task, "router data create last buf"); + + last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(last == NULL)) { + /* TODO pogorevaTb */ + } + + nxt_buf_chain_add(&b, last); + } + + if (b == NULL) { + return; + } + + if (c->write == NULL) { + c->write = b; + c->write_state = &nxt_router_conn_write_state; + + nxt_conn_write(task->thread->engine, c); + } else { + nxt_debug(task, "router data attach out bufs to existing chain"); + + nxt_buf_chain_add(&c->write, b); + } +} + + +nxt_inline nxt_port_t * +nxt_router_app_port(nxt_task_t *task) +{ + nxt_port_t *port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + nxt_runtime_port_each(rt, port) { + + if (nxt_pid == port->pid) { + continue; + } + + if (port->type == NXT_PROCESS_WORKER) { + return port; + } + + } nxt_runtime_port_loop; + + return NULL; +} + + +static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) { - size_t size; + size_t size, preread; nxt_int_t ret; nxt_buf_t *b; nxt_conn_t *c; + nxt_app_parse_ctx_t *ap; nxt_socket_conf_joint_t *joint; - nxt_http_request_parse_t *rp; + nxt_app_request_header_t *h; c = obj; - rp = data; + ap = data; + b = c->read; nxt_debug(task, "router conn http header parse"); - if (rp == NULL) { - rp = nxt_mp_zget(c->mem_pool, sizeof(nxt_http_request_parse_t)); - if (nxt_slow_path(rp == NULL)) { + if (ap == NULL) { + ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); + if (nxt_slow_path(ap == NULL)) { nxt_router_conn_close(task, c, data); return; } - c->socket.data = rp; - - ret = nxt_http_parse_request_init(rp, c->mem_pool); + ret = nxt_app_http_req_init(task, ap); if (nxt_slow_path(ret != NXT_OK)) { nxt_router_conn_close(task, c, data); return; } + + c->socket.data = ap; } - ret = nxt_http_parse_request(rp, &c->read->mem); + h = &ap->r.header; + + ret = nxt_app_http_req_parse(task, ap, b); nxt_debug(task, "http parse request: %d", ret); switch (nxt_expect(NXT_DONE, ret)) { case NXT_DONE: - break; + preread = nxt_buf_mem_used_size(&b->mem); + + nxt_debug(task, "router request header parsing complete, " + "content length: %O, preread: %uz", + h->parsed_content_length, preread); + + nxt_router_process_http_request(task, c, ap); + return; case NXT_ERROR: nxt_router_conn_close(task, c, data); @@ -965,32 +1133,122 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) default: /* NXT_AGAIN */ - if (c->read->mem.free == c->read->mem.end) { - joint = c->listen->socket.data; - size = joint->socket_conf->large_header_buffer_size, + if (h->done == 0) { + + if (c->read->mem.free == c->read->mem.end) { + joint = c->listen->socket.data; + size = joint->socket_conf->large_header_buffer_size; + + if (size > (size_t) nxt_buf_mem_size(&b->mem)) { + b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + nxt_router_conn_close(task, c, data); + return; + } + + size = c->read->mem.free - c->read->mem.pos; + nxt_memcpy(b->mem.pos, c->read->mem.pos, size); - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - nxt_router_conn_close(task, c, data); - return; + b->mem.free += size; + c->read = b; + } else { + // TODO 500 Too long request headers + nxt_log_alert(task->log, "Too long request headers"); + } } + } + + if (ap->r.body.done == 0) { + + preread = nxt_buf_mem_used_size(&b->mem); + + if (h->parsed_content_length - preread > + (size_t) nxt_buf_mem_free_size(&b->mem)) { + + b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); + if (nxt_slow_path(b == NULL)) { + // TODO 500 Failed to allocate buffer for request body + nxt_log_alert(task->log, "Failed to allocate buffer for " + "request body"); + } - size = c->read->mem.free - c->read->mem.pos; - nxt_memcpy(b->mem.pos, c->read->mem.pos, size); + b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, + preread); + + c->read = b; + } + + nxt_debug(task, "router request body read again, rest: %uz", + h->parsed_content_length - preread); - b->mem.free += size; - c->read = b; } - nxt_conn_read(task->thread->engine, c); - return; } - c->write = c->read; - c->write->mem.pos = c->write->mem.start; - c->write_state = &nxt_router_conn_write_state; + nxt_conn_read(task->thread->engine, c); +} + + +static void +nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, + nxt_app_parse_ctx_t *ap) +{ + nxt_port_t *port, *c_port; + nxt_req_id_t req_id; + nxt_app_wmsg_t wmsg; + nxt_event_engine_t *engine; + nxt_req_conn_link_t *rc; + + if (nxt_slow_path(nxt_app == NULL)) { + // 500 Application not found + nxt_log_alert(task->log, "application is NULL"); + } + + port = nxt_router_app_port(task); + + if (nxt_slow_path(port == NULL)) { + // 500 Application port not found + nxt_log_alert(task->log, "application port not found"); + } + + engine = task->thread->engine; + + do { + req_id = nxt_random(&nxt_random_data); + } while (nxt_event_engine_request_find(engine, req_id) != NULL); + + rc = nxt_conn_request_add(c, req_id); + if (nxt_slow_path(rc == NULL)) { + // 500 Failed to allocate req->conn link + nxt_log_alert(task->log, "failed to allocate req->conn link"); + } + + nxt_event_engine_request_add(engine, rc); + + nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", + req_id, c, engine); - nxt_conn_write(task->thread->engine, c); + c_port = nxt_process_connected_port_find(port->process, + engine->port->pid, + engine->port->id); + if (nxt_slow_path(c_port != engine->port)) { + (void) nxt_port_send_port(task, port, engine->port); + nxt_process_connected_port_add(port->process, engine->port); + } + + wmsg.port = port; + wmsg.write = NULL; + wmsg.buf = &wmsg.write; + wmsg.stream = req_id; + + (void)nxt_app->prepare_msg(task, &ap->r, &wmsg); + + nxt_debug(task, "about to send %d bytes buffer to worker port %d", + nxt_buf_used_size(wmsg.write), + wmsg.port->socket.fd); + + (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, + -1, req_id, engine->port->id, wmsg.write); } @@ -1002,6 +1260,59 @@ static const nxt_conn_state_t nxt_router_conn_close_state static void +nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_bool_t last; + nxt_conn_t *c; + nxt_work_queue_t *wq; + + nxt_debug(task, "router conn ready %p", obj); + + c = obj; + b = c->write; + + wq = &task->thread->engine->fast_work_queue; + + last = 0; + + while (b != NULL) { + if (!nxt_buf_is_sync(b)) { + if (nxt_buf_used_size(b) > 0) { + break; + } + } + + if (nxt_buf_is_last(b)) { + last = 1; + } + + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); + + b = b->next; + } + + c->write = b; + + if (b != NULL) { + nxt_debug(task, "router conn %p has more data to write", obj); + + nxt_conn_write(task->thread->engine, c); + } else { + nxt_debug(task, "router conn %p no more data to write, last = %d", obj, + last); + + if (last != 0) { + nxt_debug(task, "enqueue router conn close %p (ready handler)", c); + + nxt_work_queue_add(wq, nxt_router_conn_close, task, c, + c->socket.data); + } + } +} + + +static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; @@ -1020,6 +1331,7 @@ static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; + nxt_req_conn_link_t *rc; nxt_socket_conf_joint_t *joint; c = obj; @@ -1029,6 +1341,14 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) joint = c->listen->socket.data; nxt_router_conf_release(task, joint); + nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { + + nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); + + nxt_event_engine_request_remove(task->thread->engine, rc); + + } nxt_queue_loop; + nxt_mp_destroy(c->mem_pool); } |