diff options
-rw-r--r-- | src/nxt_controller.c | 306 |
1 files changed, 299 insertions, 7 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index d5092d57..2c8efd09 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -10,6 +10,12 @@ #include <nxt_master_process.h> +typedef struct { + nxt_http_request_parse_t parser; + size_t length; +} nxt_controller_request_t; + + static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c, @@ -18,17 +24,54 @@ static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, + void *data); +static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); +static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, + void *data); +static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, + void *data); static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); +static nxt_int_t nxt_controller_request_content_length(void *ctx, + nxt_str_t *name, nxt_str_t *value, uintptr_t data); + +static void nxt_controller_process_request(nxt_task_t *task, + nxt_event_conn_t *c, nxt_controller_request_t *r); +static nxt_int_t nxt_controller_request_body_parse(nxt_buf_mem_t *b); + + +static nxt_http_fields_t nxt_controller_request_fields[] = { + { nxt_string("Content-Length"), + &nxt_controller_request_content_length, 0 }, + + { nxt_null_string, NULL, 0 } +}; + + +static nxt_http_fields_hash_t *nxt_controller_request_fields_hash; + static const nxt_event_conn_state_t nxt_controller_conn_read_state; +static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; +static const nxt_event_conn_state_t nxt_controller_conn_write_state; static const nxt_event_conn_state_t nxt_controller_conn_close_state; nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt) { + nxt_http_fields_hash_t *hash; + + hash = nxt_http_fields_hash(nxt_controller_request_fields, rt->mem_pool); + + if (nxt_slow_path(hash == NULL)) { + return NXT_ERROR; + } + + nxt_controller_request_fields_hash = hash; + if (nxt_event_conn_listen(task, rt->controller_socket) != NXT_OK) { return NXT_ERROR; } @@ -116,14 +159,24 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; + nxt_buf_t *b; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + nxt_controller_request_t *r; c = obj; nxt_debug(task, "controller conn init fd:%d", c->socket.fd); + r = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_controller_request_t)); + if (nxt_slow_path(r == NULL)) { + nxt_controller_conn_free(task, c, NULL); + return; + } + + r->parser.hash = nxt_controller_request_fields_hash; + r->parser.ctx = r; + b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); if (nxt_slow_path(b == NULL)) { nxt_controller_conn_free(task, c, NULL); @@ -131,11 +184,13 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) } c->read = b; + c->socket.data = r; c->socket.read_ready = 1; c->read_state = &nxt_controller_conn_read_state; engine = task->thread->engine; c->read_work_queue = &engine->read_work_queue; + c->write_work_queue = &engine->write_work_queue; nxt_event_conn_read(engine, c); } @@ -160,13 +215,71 @@ static const nxt_event_conn_state_t nxt_controller_conn_read_state static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + size_t preread; + nxt_buf_t *b; + nxt_int_t rc; + nxt_event_conn_t *c; + nxt_controller_request_t *r; c = obj; + r = data; nxt_debug(task, "controller conn read"); - nxt_controller_conn_close(task, c, c->socket.data); + nxt_queue_remove(&c->link); + nxt_queue_self(&c->link); + + b = c->read; + + rc = nxt_http_parse_request(&r->parser, &b->mem); + + if (nxt_slow_path(rc != NXT_DONE)) { + + if (rc == NXT_AGAIN) { + if (nxt_buf_mem_free_size(&b->mem) == 0) { + nxt_log(task, NXT_LOG_ERR, "too long request headers"); + nxt_controller_conn_close(task, c, r); + return; + } + + nxt_event_conn_read(task->thread->engine, c); + return; + } + + /* rc == NXT_ERROR */ + + nxt_log(task, NXT_LOG_ERR, "parsing error"); + + nxt_controller_conn_close(task, c, r); + return; + } + + preread = nxt_buf_mem_used_size(&b->mem); + + nxt_debug(task, "controller request header parsing complete, " + "body length: %O, preread: %uz", + r->length, preread); + + if (preread >= r->length) { + nxt_controller_process_request(task, c, r); + return; + } + + if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { + b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); + if (nxt_slow_path(b == NULL)) { + nxt_controller_conn_free(task, c, NULL); + return; + } + + b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); + + c->read = b; + } + + c->read_state = &nxt_controller_conn_body_read_state; + + nxt_event_conn_read(task->thread->engine, c); } @@ -186,7 +299,7 @@ nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "controller conn read error"); - nxt_controller_conn_close(task, c, c->socket.data); + nxt_controller_conn_close(task, c, data); } @@ -204,7 +317,121 @@ nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "controller conn read timeout"); - nxt_controller_conn_close(task, c, c->socket.data); + nxt_controller_conn_close(task, c, data); +} + + +static const nxt_event_conn_state_t nxt_controller_conn_body_read_state + nxt_aligned(64) = +{ + NXT_EVENT_NO_BUF_PROCESS, + NXT_EVENT_TIMER_AUTORESET, + + nxt_controller_conn_body_read, + nxt_controller_conn_close, + nxt_controller_conn_read_error, + + nxt_controller_conn_read_timeout, + nxt_controller_conn_timeout_value, + 60 * 1000, +}; + + +static void +nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) +{ + size_t rest; + nxt_buf_t *b; + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "controller conn body read"); + + b = c->read; + + rest = nxt_buf_mem_free_size(&b->mem); + + if (rest == 0) { + nxt_debug(task, "controller conn body read complete"); + + nxt_controller_process_request(task, c, data); + return; + } + + nxt_debug(task, "controller conn body read again, rest: %uz", rest); + + nxt_event_conn_read(task->thread->engine, c); +} + + +static const nxt_event_conn_state_t nxt_controller_conn_write_state + nxt_aligned(64) = +{ + NXT_EVENT_NO_BUF_PROCESS, + NXT_EVENT_TIMER_AUTORESET, + + nxt_controller_conn_write, + NULL, + nxt_controller_conn_write_error, + + nxt_controller_conn_write_timeout, + nxt_controller_conn_timeout_value, + 60 * 1000, +}; + + +static void +nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "controller conn write"); + + b = c->write; + + if (b->mem.pos != b->mem.free) { + nxt_event_conn_write(task->thread->engine, c); + return; + } + + nxt_debug(task, "controller conn write complete"); + + nxt_controller_conn_close(task, c, data); +} + + +static void +nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "controller conn write error"); + + nxt_controller_conn_close(task, c, data); +} + + +static void +nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *ev; + nxt_event_conn_t *c; + + ev = obj; + + c = nxt_event_write_timer_conn(ev); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "controller conn write timeout"); + + nxt_controller_conn_close(task, c, data); } @@ -233,6 +460,8 @@ nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "controller conn close"); + nxt_queue_remove(&c->link); + c->write_state = &nxt_controller_conn_close_state; nxt_event_conn_close(task->thread->engine, c); @@ -252,3 +481,66 @@ nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) //nxt_free(c); } + + +static nxt_int_t +nxt_controller_request_content_length(void *ctx, nxt_str_t *name, + nxt_str_t *value, uintptr_t data) +{ + off_t length; + nxt_controller_request_t *r; + + r = ctx; + + length = nxt_off_t_parse(value->start, value->length); + + if (nxt_fast_path(length > 0)) { + /* TODO length too big */ + + r->length = length; + return NXT_OK; + } + + /* TODO logging (task?) */ + + return NXT_ERROR; +} + + +static void +nxt_controller_process_request(nxt_task_t *task, nxt_event_conn_t *c, + nxt_controller_request_t *r) +{ + size_t size; + nxt_buf_t *b, *wb; + + static const u_char response[] = "HTTP/1.0 200 OK\r\n\r\n"; + + b = c->read; + + nxt_controller_request_body_parse(&b->mem); + + size = nxt_buf_mem_used_size(&b->mem); + + wb = nxt_buf_mem_alloc(c->mem_pool, sizeof(response) - 1 + size, 0); + if (nxt_slow_path(wb == NULL)) { + nxt_controller_conn_close(task, c, r); + return; + } + + wb->mem.free = nxt_cpymem(wb->mem.free, response, sizeof(response) - 1); + wb->mem.free = nxt_cpymem(wb->mem.free, b->mem.pos, size); + + c->write = wb; + c->write_state = &nxt_controller_conn_write_state; + + nxt_event_conn_write(task->thread->engine, c); +} + + +static nxt_int_t +nxt_controller_request_body_parse(nxt_buf_mem_t *b) +{ + /* TODO */ + return NXT_OK; +} |