summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_controller.c306
1 files changed, 299 insertions, 7 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index d5092d57..2c8efd09 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -10,6 +10,12 @@
#include <nxt_master_process.h>
+typedef struct {
+ nxt_http_request_parse_t parser;
+ size_t length;
+} nxt_controller_request_t;
+
+
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c,
@@ -18,17 +24,54 @@ static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
void *data);
+static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
+static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
+static nxt_int_t nxt_controller_request_content_length(void *ctx,
+ nxt_str_t *name, nxt_str_t *value, uintptr_t data);
+
+static void nxt_controller_process_request(nxt_task_t *task,
+ nxt_event_conn_t *c, nxt_controller_request_t *r);
+static nxt_int_t nxt_controller_request_body_parse(nxt_buf_mem_t *b);
+
+
+static nxt_http_fields_t nxt_controller_request_fields[] = {
+ { nxt_string("Content-Length"),
+ &nxt_controller_request_content_length, 0 },
+
+ { nxt_null_string, NULL, 0 }
+};
+
+
+static nxt_http_fields_hash_t *nxt_controller_request_fields_hash;
+
static const nxt_event_conn_state_t nxt_controller_conn_read_state;
+static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
+static const nxt_event_conn_state_t nxt_controller_conn_write_state;
static const nxt_event_conn_state_t nxt_controller_conn_close_state;
nxt_int_t
nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt)
{
+ nxt_http_fields_hash_t *hash;
+
+ hash = nxt_http_fields_hash(nxt_controller_request_fields, rt->mem_pool);
+
+ if (nxt_slow_path(hash == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_controller_request_fields_hash = hash;
+
if (nxt_event_conn_listen(task, rt->controller_socket) != NXT_OK) {
return NXT_ERROR;
}
@@ -116,14 +159,24 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
static void
nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
+ nxt_buf_t *b;
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
+ nxt_controller_request_t *r;
c = obj;
nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
+ r = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_controller_request_t));
+ if (nxt_slow_path(r == NULL)) {
+ nxt_controller_conn_free(task, c, NULL);
+ return;
+ }
+
+ r->parser.hash = nxt_controller_request_fields_hash;
+ r->parser.ctx = r;
+
b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
if (nxt_slow_path(b == NULL)) {
nxt_controller_conn_free(task, c, NULL);
@@ -131,11 +184,13 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
}
c->read = b;
+ c->socket.data = r;
c->socket.read_ready = 1;
c->read_state = &nxt_controller_conn_read_state;
engine = task->thread->engine;
c->read_work_queue = &engine->read_work_queue;
+ c->write_work_queue = &engine->write_work_queue;
nxt_event_conn_read(engine, c);
}
@@ -160,13 +215,71 @@ static const nxt_event_conn_state_t nxt_controller_conn_read_state
static void
nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ size_t preread;
+ nxt_buf_t *b;
+ nxt_int_t rc;
+ nxt_event_conn_t *c;
+ nxt_controller_request_t *r;
c = obj;
+ r = data;
nxt_debug(task, "controller conn read");
- nxt_controller_conn_close(task, c, c->socket.data);
+ nxt_queue_remove(&c->link);
+ nxt_queue_self(&c->link);
+
+ b = c->read;
+
+ rc = nxt_http_parse_request(&r->parser, &b->mem);
+
+ if (nxt_slow_path(rc != NXT_DONE)) {
+
+ if (rc == NXT_AGAIN) {
+ if (nxt_buf_mem_free_size(&b->mem) == 0) {
+ nxt_log(task, NXT_LOG_ERR, "too long request headers");
+ nxt_controller_conn_close(task, c, r);
+ return;
+ }
+
+ nxt_event_conn_read(task->thread->engine, c);
+ return;
+ }
+
+ /* rc == NXT_ERROR */
+
+ nxt_log(task, NXT_LOG_ERR, "parsing error");
+
+ nxt_controller_conn_close(task, c, r);
+ return;
+ }
+
+ preread = nxt_buf_mem_used_size(&b->mem);
+
+ nxt_debug(task, "controller request header parsing complete, "
+ "body length: %O, preread: %uz",
+ r->length, preread);
+
+ if (preread >= r->length) {
+ nxt_controller_process_request(task, c, r);
+ return;
+ }
+
+ if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
+ b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
+ if (nxt_slow_path(b == NULL)) {
+ nxt_controller_conn_free(task, c, NULL);
+ return;
+ }
+
+ b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
+
+ c->read = b;
+ }
+
+ c->read_state = &nxt_controller_conn_body_read_state;
+
+ nxt_event_conn_read(task->thread->engine, c);
}
@@ -186,7 +299,7 @@ nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "controller conn read error");
- nxt_controller_conn_close(task, c, c->socket.data);
+ nxt_controller_conn_close(task, c, data);
}
@@ -204,7 +317,121 @@ nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "controller conn read timeout");
- nxt_controller_conn_close(task, c, c->socket.data);
+ nxt_controller_conn_close(task, c, data);
+}
+
+
+static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
+ nxt_aligned(64) =
+{
+ NXT_EVENT_NO_BUF_PROCESS,
+ NXT_EVENT_TIMER_AUTORESET,
+
+ nxt_controller_conn_body_read,
+ nxt_controller_conn_close,
+ nxt_controller_conn_read_error,
+
+ nxt_controller_conn_read_timeout,
+ nxt_controller_conn_timeout_value,
+ 60 * 1000,
+};
+
+
+static void
+nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
+{
+ size_t rest;
+ nxt_buf_t *b;
+ nxt_event_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "controller conn body read");
+
+ b = c->read;
+
+ rest = nxt_buf_mem_free_size(&b->mem);
+
+ if (rest == 0) {
+ nxt_debug(task, "controller conn body read complete");
+
+ nxt_controller_process_request(task, c, data);
+ return;
+ }
+
+ nxt_debug(task, "controller conn body read again, rest: %uz", rest);
+
+ nxt_event_conn_read(task->thread->engine, c);
+}
+
+
+static const nxt_event_conn_state_t nxt_controller_conn_write_state
+ nxt_aligned(64) =
+{
+ NXT_EVENT_NO_BUF_PROCESS,
+ NXT_EVENT_TIMER_AUTORESET,
+
+ nxt_controller_conn_write,
+ NULL,
+ nxt_controller_conn_write_error,
+
+ nxt_controller_conn_write_timeout,
+ nxt_controller_conn_timeout_value,
+ 60 * 1000,
+};
+
+
+static void
+nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_event_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "controller conn write");
+
+ b = c->write;
+
+ if (b->mem.pos != b->mem.free) {
+ nxt_event_conn_write(task->thread->engine, c);
+ return;
+ }
+
+ nxt_debug(task, "controller conn write complete");
+
+ nxt_controller_conn_close(task, c, data);
+}
+
+
+static void
+nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "controller conn write error");
+
+ nxt_controller_conn_close(task, c, data);
+}
+
+
+static void
+nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_timer_t *ev;
+ nxt_event_conn_t *c;
+
+ ev = obj;
+
+ c = nxt_event_write_timer_conn(ev);
+ c->socket.timedout = 1;
+ c->socket.closed = 1;
+
+ nxt_debug(task, "controller conn write timeout");
+
+ nxt_controller_conn_close(task, c, data);
}
@@ -233,6 +460,8 @@ nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "controller conn close");
+ nxt_queue_remove(&c->link);
+
c->write_state = &nxt_controller_conn_close_state;
nxt_event_conn_close(task->thread->engine, c);
@@ -252,3 +481,66 @@ nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
//nxt_free(c);
}
+
+
+static nxt_int_t
+nxt_controller_request_content_length(void *ctx, nxt_str_t *name,
+ nxt_str_t *value, uintptr_t data)
+{
+ off_t length;
+ nxt_controller_request_t *r;
+
+ r = ctx;
+
+ length = nxt_off_t_parse(value->start, value->length);
+
+ if (nxt_fast_path(length > 0)) {
+ /* TODO length too big */
+
+ r->length = length;
+ return NXT_OK;
+ }
+
+ /* TODO logging (task?) */
+
+ return NXT_ERROR;
+}
+
+
+static void
+nxt_controller_process_request(nxt_task_t *task, nxt_event_conn_t *c,
+ nxt_controller_request_t *r)
+{
+ size_t size;
+ nxt_buf_t *b, *wb;
+
+ static const u_char response[] = "HTTP/1.0 200 OK\r\n\r\n";
+
+ b = c->read;
+
+ nxt_controller_request_body_parse(&b->mem);
+
+ size = nxt_buf_mem_used_size(&b->mem);
+
+ wb = nxt_buf_mem_alloc(c->mem_pool, sizeof(response) - 1 + size, 0);
+ if (nxt_slow_path(wb == NULL)) {
+ nxt_controller_conn_close(task, c, r);
+ return;
+ }
+
+ wb->mem.free = nxt_cpymem(wb->mem.free, response, sizeof(response) - 1);
+ wb->mem.free = nxt_cpymem(wb->mem.free, b->mem.pos, size);
+
+ c->write = wb;
+ c->write_state = &nxt_controller_conn_write_state;
+
+ nxt_event_conn_write(task->thread->engine, c);
+}
+
+
+static nxt_int_t
+nxt_controller_request_body_parse(nxt_buf_mem_t *b)
+{
+ /* TODO */
+ return NXT_OK;
+}