summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_conn.c34
-rw-r--r--src/nxt_conn.h18
-rw-r--r--src/nxt_event_engine.c112
-rw-r--r--src/nxt_event_engine.h10
-rw-r--r--src/nxt_main.h4
-rw-r--r--src/nxt_port.c2
-rw-r--r--src/nxt_process.c26
-rw-r--r--src/nxt_process.h12
-rw-r--r--src/nxt_router.c378
9 files changed, 563 insertions, 33 deletions
diff --git a/src/nxt_conn.c b/src/nxt_conn.c
index bba2bfd1..0cc052e8 100644
--- a/src/nxt_conn.c
+++ b/src/nxt_conn.c
@@ -85,6 +85,8 @@ nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task)
nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
+ nxt_queue_init(&c->requests);
+
nxt_log_debug(&c->log, "connections: %uD", thr->engine->connections);
return c;
@@ -150,3 +152,35 @@ nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq)
c->read_timer.work_queue = wq;
c->write_timer.work_queue = wq;
}
+
+
+nxt_req_conn_link_t *
+nxt_conn_request_add(nxt_conn_t *c, nxt_req_id_t req_id)
+{
+ nxt_req_conn_link_t *rc;
+
+ rc = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_req_conn_link_t));
+ if (nxt_slow_path(rc == NULL)) {
+ nxt_thread_log_error(NXT_LOG_WARN, "failed to allocate req %08uxD "
+ "to conn", req_id);
+ return NULL;
+ }
+
+ rc->req_id = req_id;
+ rc->conn = c;
+
+ nxt_queue_insert_tail(&c->requests, &rc->link);
+
+ return rc;
+}
+
+
+void
+nxt_conn_request_remove(nxt_conn_t *c, nxt_req_conn_link_t *rc)
+{
+ nxt_queue_remove(&rc->link);
+
+ nxt_mp_free(c->mem_pool, rc);
+}
+
+
diff --git a/src/nxt_conn.h b/src/nxt_conn.h
index 479450bb..6c4de090 100644
--- a/src/nxt_conn.h
+++ b/src/nxt_conn.h
@@ -141,6 +141,8 @@ struct nxt_conn_s {
nxt_conn_io_t *io;
+ nxt_queue_t requests; /* of nxt_req_conn_link_t */
+
#if (NXT_SSLTLS || NXT_THREADS)
/* SunC does not support "zero-sized struct/union". */
@@ -180,6 +182,16 @@ struct nxt_conn_s {
};
+typedef uint32_t nxt_req_id_t;
+
+typedef struct {
+ nxt_req_id_t req_id;
+ nxt_conn_t *conn;
+
+ nxt_queue_link_t link;
+} nxt_req_conn_link_t;
+
+
#define nxt_conn_timer_init(ev, c, wq) \
do { \
(ev)->work_queue = (wq); \
@@ -347,4 +359,10 @@ NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
#define nxt_event_conn_close nxt_conn_close
+NXT_EXPORT nxt_req_conn_link_t *nxt_conn_request_add(nxt_conn_t *c,
+ nxt_req_id_t req_id);
+NXT_EXPORT void nxt_conn_request_remove(nxt_conn_t *c,
+ nxt_req_conn_link_t *rc);
+
+
#endif /* _NXT_CONN_H_INCLUDED_ */
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index d636dc25..9171bcd6 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -544,3 +544,115 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
nxt_timer_expire(engine, now);
}
}
+
+
+static nxt_int_t
+nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ return NXT_OK;
+}
+
+static const nxt_lvlhsh_proto_t lvlhsh_req_conn_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_req_conn_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+void
+nxt_event_engine_request_add(nxt_event_engine_t *engine,
+ nxt_req_conn_link_t *rc)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
+ lhq.key.length = sizeof(rc->req_id);
+ lhq.key.start = (u_char *) &rc->req_id;
+ lhq.proto = &lvlhsh_req_conn_proto;
+ lhq.replace = 0;
+ lhq.value = rc;
+ lhq.pool = engine->mem_pool;
+
+ switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) {
+
+ case NXT_OK:
+ break;
+
+ default:
+ nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed",
+ rc->req_id);
+ break;
+ }
+}
+
+
+nxt_req_conn_link_t *
+nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
+ lhq.key.length = sizeof(req_id);
+ lhq.key.start = (u_char *) &req_id;
+ lhq.proto = &lvlhsh_req_conn_proto;
+
+ if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) {
+ return lhq.value;
+ }
+
+ return NULL;
+}
+
+
+void
+nxt_event_engine_request_remove(nxt_event_engine_t *engine,
+ nxt_req_conn_link_t *rc)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
+ lhq.key.length = sizeof(rc->req_id);
+ lhq.key.start = (u_char *) &rc->req_id;
+ lhq.proto = &lvlhsh_req_conn_proto;
+ lhq.pool = engine->mem_pool;
+
+ switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
+
+ case NXT_OK:
+ break;
+
+ default:
+ nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
+ rc->req_id);
+ break;
+ }
+}
+
+
+nxt_req_conn_link_t *
+nxt_event_engine_request_find_remove(nxt_event_engine_t *engine,
+ nxt_req_id_t req_id)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
+ lhq.key.length = sizeof(req_id);
+ lhq.key.start = (u_char *) &req_id;
+ lhq.proto = &lvlhsh_req_conn_proto;
+ lhq.pool = engine->mem_pool;
+
+ switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
+
+ case NXT_OK:
+ return lhq.value;
+
+ default:
+ nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
+ req_id);
+ break;
+ }
+
+ return NULL;
+}
+
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index d58949ba..ceb1654e 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -491,6 +491,7 @@ struct nxt_event_engine_s {
nxt_queue_t joints;
nxt_queue_t listen_connections;
nxt_queue_t idle_connections;
+ nxt_lvlhsh_t requests; /* req_id to nxt_req_conn_link_t */
nxt_queue_link_t link;
};
@@ -509,6 +510,15 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine,
NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine,
nxt_uint_t signo);
+NXT_EXPORT void nxt_event_engine_request_add(nxt_event_engine_t *engine,
+ nxt_req_conn_link_t *rc);
+NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find(
+ nxt_event_engine_t *engine, nxt_req_id_t req_id);
+NXT_EXPORT void nxt_event_engine_request_remove(nxt_event_engine_t *engine,
+ nxt_req_conn_link_t *rc);
+NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find_remove(
+ nxt_event_engine_t *engine, nxt_req_id_t req_id);
+
nxt_inline nxt_event_engine_t *
nxt_thread_event_engine(void)
diff --git a/src/nxt_main.h b/src/nxt_main.h
index 0aa2192c..4b9a9036 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -26,7 +26,6 @@ typedef struct nxt_runtime_s nxt_runtime_t;
typedef uint16_t nxt_port_id_t;
#include <nxt_queue.h>
-#include <nxt_process.h>
typedef struct nxt_thread_s nxt_thread_t;
#include <nxt_thread_id.h>
@@ -36,6 +35,8 @@ typedef struct nxt_thread_s nxt_thread_t;
#include <nxt_random.h>
#include <nxt_string.h>
+#include <nxt_lvlhsh.h>
+#include <nxt_process.h>
#include <nxt_utf8.h>
#include <nxt_file_name.h>
@@ -90,7 +91,6 @@ typedef struct nxt_thread_pool_s nxt_thread_pool_t;
#include <nxt_djb_hash.h>
#include <nxt_murmur_hash.h>
-#include <nxt_lvlhsh.h>
#include <nxt_hash.h>
#include <nxt_sort.h>
diff --git a/src/nxt_port.c b/src/nxt_port.c
index eb320640..91f0fe31 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -142,8 +142,6 @@ nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
b = obj;
mp = b->data;
- /* TODO: b->mem.pos */
-
nxt_buf_free(mp, b);
}
diff --git a/src/nxt_process.c b/src/nxt_process.c
index ce16cf7f..33af310d 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -540,3 +540,29 @@ nxt_process_port_new(nxt_process_t *process)
return port;
}
+
+void
+nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
+{
+ /* TODO lock ports */
+
+ nxt_port_hash_add(&process->connected_ports, process->mem_pool, port);
+}
+
+void
+nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
+{
+ /* TODO lock ports */
+
+ nxt_port_hash_remove(&process->connected_ports, process->mem_pool, port);
+}
+
+nxt_port_t *
+nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
+ nxt_port_id_t port_id)
+{
+ /* TODO lock ports */
+
+ return nxt_port_hash_find(&process->connected_ports, pid, port_id);
+}
+
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 669c8f03..32a8d8dc 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -58,6 +58,8 @@ typedef struct {
nxt_process_init_t *init;
nxt_array_t *incoming; /* of nxt_mmap_t */
nxt_array_t *outgoing; /* of nxt_mmap_t */
+
+ nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
} nxt_process_t;
@@ -88,6 +90,16 @@ NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_process_t *process);
#define nxt_process_port_loop \
nxt_queue_loop
+
+void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
+
+void nxt_process_connected_port_remove(nxt_process_t *process,
+ nxt_port_t *port);
+
+nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
+ nxt_pid_t pid, nxt_port_id_t port_id);
+
+
#if (NXT_HAVE_SETPROCTITLE)
#define nxt_process_title(task, fmt, ...) \
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 75edba40..d55862f7 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -6,6 +6,7 @@
*/
#include <nxt_router.h>
+#include <nxt_application.h>
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
@@ -64,6 +65,9 @@ static void nxt_router_conf_release(nxt_task_t *task,
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_process_http_request(nxt_task_t *task,
+ nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
+static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
@@ -79,6 +83,11 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_router_temp_conf_t *tmcf;
const nxt_event_interface_t *interface;
+ ret = nxt_app_http_init(task, rt);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
router = nxt_zalloc(sizeof(nxt_router_t));
if (nxt_slow_path(router == NULL)) {
return NXT_ERROR;
@@ -519,6 +528,7 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
joint->count = 1;
joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
+ joint->engine = recf->engine;
}
return NXT_OK;
@@ -578,7 +588,10 @@ static nxt_int_t
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine)
{
+ nxt_mp_t *mp;
nxt_int_t ret;
+ nxt_port_t *port;
+ nxt_process_t *process;
nxt_thread_link_t *link;
nxt_thread_handle_t handle;
@@ -596,6 +609,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_queue_insert_tail(&rt->engines, &engine->link);
+
+ process = nxt_runtime_process_find(rt, nxt_pid);
+ if (nxt_slow_path(process == NULL)) {
+ return NXT_ERROR;
+ }
+
+ port = nxt_process_port_new(process);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+ if (nxt_slow_path(mp == NULL)) {
+ return NXT_ERROR;
+ }
+
+ port->mem_pool = mp;
+ port->engine = 0;
+ port->type = NXT_PROCESS_ROUTER;
+
+ engine->port = port;
+
+ nxt_runtime_port_add(rt, port);
+
+
ret = nxt_thread_create(&handle, link);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -637,14 +680,28 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf)
static void
+nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+
+nxt_port_handler_t nxt_router_process_port_handlers[] = {
+ NULL,
+ nxt_port_new_port_handler,
+ nxt_port_change_log_file_handler,
+ nxt_port_mmap_handler,
+ nxt_router_data_handler,
+};
+
+
+static void
nxt_router_thread_start(void *data)
{
+ nxt_task_t *task;
nxt_thread_t *thread;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
link = data;
engine = link->engine;
+ task = &engine->task;
thread = nxt_thread();
@@ -657,6 +714,9 @@ nxt_router_thread_start(void *data)
thread->task = &engine->task;
thread->fiber = &engine->fibers->fiber;
+ engine->port->socket.task = task;
+ nxt_port_create(task, engine->port, nxt_router_process_port_handlers);
+
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
nxt_event_engine_start(engine);
@@ -913,51 +973,159 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
static const nxt_conn_state_t nxt_router_conn_write_state
nxt_aligned(64) =
{
- .ready_handler = nxt_router_conn_close,
+ .ready_handler = nxt_router_conn_ready,
.close_handler = nxt_router_conn_close,
.error_handler = nxt_router_conn_error,
};
static void
+nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ size_t dump_size;
+ nxt_buf_t *b, *i, *last;
+ nxt_conn_t *c;
+ nxt_work_queue_t *wq;
+ nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
+
+ b = msg->buf;
+ engine = task->thread->engine;
+ wq = &engine->fast_work_queue;
+
+ rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
+ if (nxt_slow_path(rc == NULL)) {
+
+ nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
+
+ /* complete buffer(s) */
+ for (i = b; i != NULL; i = i->next) {
+ i->mem.pos = i->mem.free;
+
+ nxt_work_queue_add(wq, i->completion_handler, task, i, i->parent);
+ }
+
+ return;
+ }
+
+ c = rc->conn;
+
+ dump_size = nxt_buf_used_size(b);
+
+ if (dump_size > 300) {
+ dump_size = 300;
+ }
+
+ nxt_debug(task, "%srouter data (%z): %*s",
+ msg->port_msg.last ? "last " : "", msg->size, dump_size,
+ b->mem.pos);
+
+ if (msg->size == 0) {
+ b = NULL;
+ }
+
+ if (msg->port_msg.last != 0) {
+ nxt_debug(task, "router data create last buf");
+
+ last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
+ if (nxt_slow_path(last == NULL)) {
+ /* TODO pogorevaTb */
+ }
+
+ nxt_buf_chain_add(&b, last);
+ }
+
+ if (b == NULL) {
+ return;
+ }
+
+ if (c->write == NULL) {
+ c->write = b;
+ c->write_state = &nxt_router_conn_write_state;
+
+ nxt_conn_write(task->thread->engine, c);
+ } else {
+ nxt_debug(task, "router data attach out bufs to existing chain");
+
+ nxt_buf_chain_add(&c->write, b);
+ }
+}
+
+
+nxt_inline nxt_port_t *
+nxt_router_app_port(nxt_task_t *task)
+{
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ nxt_runtime_port_each(rt, port) {
+
+ if (nxt_pid == port->pid) {
+ continue;
+ }
+
+ if (port->type == NXT_PROCESS_WORKER) {
+ return port;
+ }
+
+ } nxt_runtime_port_loop;
+
+ return NULL;
+}
+
+
+static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
- size_t size;
+ size_t size, preread;
nxt_int_t ret;
nxt_buf_t *b;
nxt_conn_t *c;
+ nxt_app_parse_ctx_t *ap;
nxt_socket_conf_joint_t *joint;
- nxt_http_request_parse_t *rp;
+ nxt_app_request_header_t *h;
c = obj;
- rp = data;
+ ap = data;
+ b = c->read;
nxt_debug(task, "router conn http header parse");
- if (rp == NULL) {
- rp = nxt_mp_zget(c->mem_pool, sizeof(nxt_http_request_parse_t));
- if (nxt_slow_path(rp == NULL)) {
+ if (ap == NULL) {
+ ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
+ if (nxt_slow_path(ap == NULL)) {
nxt_router_conn_close(task, c, data);
return;
}
- c->socket.data = rp;
-
- ret = nxt_http_parse_request_init(rp, c->mem_pool);
+ ret = nxt_app_http_req_init(task, ap);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_router_conn_close(task, c, data);
return;
}
+
+ c->socket.data = ap;
}
- ret = nxt_http_parse_request(rp, &c->read->mem);
+ h = &ap->r.header;
+
+ ret = nxt_app_http_req_parse(task, ap, b);
nxt_debug(task, "http parse request: %d", ret);
switch (nxt_expect(NXT_DONE, ret)) {
case NXT_DONE:
- break;
+ preread = nxt_buf_mem_used_size(&b->mem);
+
+ nxt_debug(task, "router request header parsing complete, "
+ "content length: %O, preread: %uz",
+ h->parsed_content_length, preread);
+
+ nxt_router_process_http_request(task, c, ap);
+ return;
case NXT_ERROR:
nxt_router_conn_close(task, c, data);
@@ -965,32 +1133,122 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
default: /* NXT_AGAIN */
- if (c->read->mem.free == c->read->mem.end) {
- joint = c->listen->socket.data;
- size = joint->socket_conf->large_header_buffer_size,
+ if (h->done == 0) {
+
+ if (c->read->mem.free == c->read->mem.end) {
+ joint = c->listen->socket.data;
+ size = joint->socket_conf->large_header_buffer_size;
+
+ if (size > (size_t) nxt_buf_mem_size(&b->mem)) {
+ b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ nxt_router_conn_close(task, c, data);
+ return;
+ }
+
+ size = c->read->mem.free - c->read->mem.pos;
+ nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_conn_close(task, c, data);
- return;
+ b->mem.free += size;
+ c->read = b;
+ } else {
+ // TODO 500 Too long request headers
+ nxt_log_alert(task->log, "Too long request headers");
+ }
}
+ }
+
+ if (ap->r.body.done == 0) {
+
+ preread = nxt_buf_mem_used_size(&b->mem);
+
+ if (h->parsed_content_length - preread >
+ (size_t) nxt_buf_mem_free_size(&b->mem)) {
+
+ b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
+ if (nxt_slow_path(b == NULL)) {
+ // TODO 500 Failed to allocate buffer for request body
+ nxt_log_alert(task->log, "Failed to allocate buffer for "
+ "request body");
+ }
- size = c->read->mem.free - c->read->mem.pos;
- nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
+ b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos,
+ preread);
+
+ c->read = b;
+ }
+
+ nxt_debug(task, "router request body read again, rest: %uz",
+ h->parsed_content_length - preread);
- b->mem.free += size;
- c->read = b;
}
- nxt_conn_read(task->thread->engine, c);
- return;
}
- c->write = c->read;
- c->write->mem.pos = c->write->mem.start;
- c->write_state = &nxt_router_conn_write_state;
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static void
+nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
+ nxt_app_parse_ctx_t *ap)
+{
+ nxt_port_t *port, *c_port;
+ nxt_req_id_t req_id;
+ nxt_app_wmsg_t wmsg;
+ nxt_event_engine_t *engine;
+ nxt_req_conn_link_t *rc;
+
+ if (nxt_slow_path(nxt_app == NULL)) {
+ // 500 Application not found
+ nxt_log_alert(task->log, "application is NULL");
+ }
+
+ port = nxt_router_app_port(task);
+
+ if (nxt_slow_path(port == NULL)) {
+ // 500 Application port not found
+ nxt_log_alert(task->log, "application port not found");
+ }
+
+ engine = task->thread->engine;
+
+ do {
+ req_id = nxt_random(&nxt_random_data);
+ } while (nxt_event_engine_request_find(engine, req_id) != NULL);
+
+ rc = nxt_conn_request_add(c, req_id);
+ if (nxt_slow_path(rc == NULL)) {
+ // 500 Failed to allocate req->conn link
+ nxt_log_alert(task->log, "failed to allocate req->conn link");
+ }
+
+ nxt_event_engine_request_add(engine, rc);
+
+ nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
+ req_id, c, engine);
- nxt_conn_write(task->thread->engine, c);
+ c_port = nxt_process_connected_port_find(port->process,
+ engine->port->pid,
+ engine->port->id);
+ if (nxt_slow_path(c_port != engine->port)) {
+ (void) nxt_port_send_port(task, port, engine->port);
+ nxt_process_connected_port_add(port->process, engine->port);
+ }
+
+ wmsg.port = port;
+ wmsg.write = NULL;
+ wmsg.buf = &wmsg.write;
+ wmsg.stream = req_id;
+
+ (void)nxt_app->prepare_msg(task, &ap->r, &wmsg);
+
+ nxt_debug(task, "about to send %d bytes buffer to worker port %d",
+ nxt_buf_used_size(wmsg.write),
+ wmsg.port->socket.fd);
+
+ (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
+ -1, req_id, engine->port->id, wmsg.write);
}
@@ -1002,6 +1260,59 @@ static const nxt_conn_state_t nxt_router_conn_close_state
static void
+nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_bool_t last;
+ nxt_conn_t *c;
+ nxt_work_queue_t *wq;
+
+ nxt_debug(task, "router conn ready %p", obj);
+
+ c = obj;
+ b = c->write;
+
+ wq = &task->thread->engine->fast_work_queue;
+
+ last = 0;
+
+ while (b != NULL) {
+ if (!nxt_buf_is_sync(b)) {
+ if (nxt_buf_used_size(b) > 0) {
+ break;
+ }
+ }
+
+ if (nxt_buf_is_last(b)) {
+ last = 1;
+ }
+
+ nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
+
+ b = b->next;
+ }
+
+ c->write = b;
+
+ if (b != NULL) {
+ nxt_debug(task, "router conn %p has more data to write", obj);
+
+ nxt_conn_write(task->thread->engine, c);
+ } else {
+ nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
+ last);
+
+ if (last != 0) {
+ nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
+
+ nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
+ c->socket.data);
+ }
+ }
+}
+
+
+static void
nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
@@ -1020,6 +1331,7 @@ static void
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
+ nxt_req_conn_link_t *rc;
nxt_socket_conf_joint_t *joint;
c = obj;
@@ -1029,6 +1341,14 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
joint = c->listen->socket.data;
nxt_router_conf_release(task, joint);
+ nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
+
+ nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
+
+ nxt_event_engine_request_remove(task->thread->engine, rc);
+
+ } nxt_queue_loop;
+
nxt_mp_destroy(c->mem_pool);
}