diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
commit | 39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch) | |
tree | 220ea8605a9396a93fa7a459ff3e558fe1a044aa /src/nxt_router.c | |
parent | e1e808bd94609c80b4990939285d47f124bb2eef (diff) | |
download | unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2 |
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 242 |
1 files changed, 191 insertions, 51 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 5beceb38..fa2e957d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -128,6 +128,8 @@ static void nxt_router_app_release_port(nxt_task_t *task, void *obj, static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, + void *data); static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_process_http_request_mp(nxt_task_t *task, @@ -604,10 +606,34 @@ static nxt_conf_map_t nxt_router_http_conf[] = { }, { + nxt_string("large_header_buffers"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, large_header_buffers), + }, + + { + nxt_string("body_buffer_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, body_buffer_size), + }, + + { + nxt_string("max_body_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, max_body_size), + }, + + { nxt_string("header_read_timeout"), NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, header_read_timeout), }, + + { + nxt_string("body_read_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_socket_conf_t, body_read_timeout), + }, }; @@ -792,7 +818,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, // STUB, default values if http block is not defined. skcf->header_buffer_size = 2048; skcf->large_header_buffer_size = 8192; + skcf->large_header_buffers = 4; + skcf->body_buffer_size = 16 * 1024; + skcf->max_body_size = 2 * 1024 * 1024; skcf->header_read_timeout = 5000; + skcf->body_read_timeout = 5000; if (http != NULL) { ret = nxt_conf_map_object(http, nxt_router_http_conf, @@ -1807,7 +1837,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) } -static const nxt_conn_state_t nxt_router_conn_read_state +static const nxt_conn_state_t nxt_router_conn_read_header_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_http_header_parse, @@ -1820,6 +1850,20 @@ static const nxt_conn_state_t nxt_router_conn_read_state }; +static const nxt_conn_state_t nxt_router_conn_read_body_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_http_body_read, + .close_handler = nxt_router_conn_close, + .error_handler = nxt_router_conn_error, + + .timer_handler = nxt_router_conn_timeout, + .timer_value = nxt_router_conn_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), + .timer_autoreset = 1, +}; + + static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) { @@ -1844,7 +1888,7 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) c->read_work_queue = &engine->fast_work_queue; c->write_work_queue = &engine->fast_work_queue; - c->read_state = &nxt_router_conn_read_state; + c->read_state = &nxt_router_conn_read_header_state; nxt_conn_read(engine, c); } @@ -1873,7 +1917,6 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); if (nxt_slow_path(rc == NULL)) { - nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); return; @@ -1910,14 +1953,18 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rc->app_port = NULL; } + + rc->conn = NULL; } if (b == NULL) { return; } - /* Disable instant buffer completion/re-using by port. */ - msg->buf = NULL; + if (msg->buf == b) { + /* Disable instant buffer completion/re-using by port. */ + msg->buf = NULL; + } if (c->write == NULL) { c->write = b; @@ -1938,6 +1985,9 @@ nxt_router_text_by_code(int code) case 400: return "Bad request"; case 404: return "Not found"; case 403: return "Forbidden"; + case 408: return "Request Timeout"; + case 411: return "Length Required"; + case 413: return "Request Entity Too Large"; case 500: default: return "Internal server error"; } @@ -1965,6 +2015,7 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, msg = (const char *) b->mem.free; b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); + b->mem.free[0] = '\0'; nxt_log_alert(task->log, "error %d: %s", code, msg); @@ -1996,6 +2047,11 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); va_end(args); + if (c->socket.data != NULL) { + nxt_mp_free(c->mem_pool, c->socket.data); + c->socket.data = NULL; + } + if (c->write == NULL) { c->write = b; c->write_state = &nxt_router_conn_write_state; @@ -2345,22 +2401,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) { - size_t size, preread; + size_t size; nxt_int_t ret; - nxt_buf_t *b; + nxt_buf_t *buf; nxt_conn_t *c; nxt_app_parse_ctx_t *ap; + nxt_app_request_body_t *b; nxt_socket_conf_joint_t *joint; nxt_app_request_header_t *h; c = obj; ap = data; - b = c->read; + buf = c->read; + joint = c->listen->socket.data; nxt_debug(task, "router conn http header parse"); if (ap == NULL) { - ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); + ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); if (nxt_slow_path(ap == NULL)) { nxt_router_conn_close(task, c, data); return; @@ -2376,78 +2434,157 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) ap->r.remote.start = nxt_sockaddr_address(c->remote); ap->r.remote.length = c->remote->address_length; + + ap->r.header.buf = buf; } h = &ap->r.header; + b = &ap->r.body; - ret = nxt_app_http_req_parse(task, ap, b); + ret = nxt_app_http_req_header_parse(task, ap, buf); - nxt_debug(task, "http parse request: %d", ret); + nxt_debug(task, "http parse request header: %d", ret); switch (nxt_expect(NXT_DONE, ret)) { case NXT_DONE: - preread = nxt_buf_mem_used_size(&b->mem); - nxt_debug(task, "router request header parsing complete, " "content length: %O, preread: %uz", - h->parsed_content_length, preread); + h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); - nxt_router_process_http_request(task, c, ap); - return; + if (b->done) { + nxt_router_process_http_request(task, c, ap); + + return; + } + + if (joint->socket_conf->max_body_size > 0 && + (size_t) h->parsed_content_length > + joint->socket_conf->max_body_size) { + + nxt_router_gen_error(task, c, 413, "Content-Length too big"); + return; + } + + if (nxt_buf_mem_free_size(&buf->mem) == 0) { + size = nxt_min(joint->socket_conf->body_buffer_size, + (size_t) h->parsed_content_length); + + buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(buf->next == NULL)) { + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "buffer for request body"); + return; + } + + c->read = buf->next; + + b->preread_size += nxt_buf_mem_used_size(&buf->mem); + } + + if (b->buf == NULL) { + b->buf = c->read; + } + + c->read_state = &nxt_router_conn_read_body_state; + break; case NXT_ERROR: - nxt_router_conn_close(task, c, data); + nxt_router_gen_error(task, c, 400, "Request header parse error"); return; default: /* NXT_AGAIN */ - if (h->done == 0) { + if (c->read->mem.free == c->read->mem.end) { + size = joint->socket_conf->large_header_buffer_size; - if (c->read->mem.free == c->read->mem.end) { - joint = c->listen->socket.data; - size = joint->socket_conf->large_header_buffer_size; + if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) || + ap->r.header.bufs >= joint->socket_conf->large_header_buffers) { + nxt_router_gen_error(task, c, 413, + "Too long request headers"); + return; + } - if (size > (size_t) nxt_buf_mem_size(&b->mem)) { - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - nxt_router_conn_close(task, c, data); - return; - } + buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(buf->next == NULL)) { + nxt_router_gen_error(task, c, 500, + "Failed to allocate large header " + "buffer"); + return; + } - size = c->read->mem.free - c->read->mem.pos; + ap->r.header.bufs++; - c->read = nxt_buf_cpy(b, c->read->mem.pos, size); - } else { - nxt_router_gen_error(task, c, 400, - "Too long request headers"); - return; - } - } + size = c->read->mem.free - c->read->mem.pos; + + c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); } - if (ap->r.body.done == 0) { + } + + nxt_conn_read(task->thread->engine, c); +} - preread = nxt_buf_mem_used_size(&b->mem); - if (h->parsed_content_length - preread > - (size_t) nxt_buf_mem_free_size(&b->mem)) { +static void +nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_int_t ret; + nxt_buf_t *buf; + nxt_conn_t *c; + nxt_app_parse_ctx_t *ap; + nxt_app_request_body_t *b; + nxt_socket_conf_joint_t *joint; + nxt_app_request_header_t *h; - b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); - if (nxt_slow_path(b == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "buffer for request body"); - return; - } + c = obj; + ap = data; + buf = c->read; - c->read = nxt_buf_cpy(b, c->read->mem.pos, preread); - } + nxt_debug(task, "router conn http body read"); - nxt_debug(task, "router request body read again, rest: %uz", - h->parsed_content_length - preread); + nxt_assert(ap != NULL); + b = &ap->r.body; + h = &ap->r.header; + + ret = nxt_app_http_req_body_read(task, ap, buf); + + nxt_debug(task, "http read request body: %d", ret); + + switch (nxt_expect(NXT_DONE, ret)) { + + case NXT_DONE: + nxt_router_process_http_request(task, c, ap); + return; + + case NXT_ERROR: + nxt_router_gen_error(task, c, 500, "Read body error"); + return; + + default: /* NXT_AGAIN */ + + if (nxt_buf_mem_free_size(&buf->mem) == 0) { + joint = c->listen->socket.data; + + b->preread_size += nxt_buf_mem_used_size(&buf->mem); + + size = nxt_min(joint->socket_conf->body_buffer_size, + (size_t) h->parsed_content_length - b->preread_size); + + buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(buf->next == NULL)) { + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "buffer for request body"); + return; + } + + c->read = buf->next; } + nxt_debug(task, "router request body read again, rest: %uz", + h->parsed_content_length - b->preread_size); } nxt_conn_read(task->thread->engine, c); @@ -2725,9 +2862,12 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) c = nxt_read_timer_conn(timer); - c->write_state = &nxt_router_conn_close_state; + if (c->read_state == &nxt_router_conn_read_header_state) { + nxt_router_gen_error(task, c, 408, "Read header timeout"); - nxt_conn_close(task->thread->engine, c); + } else { + nxt_router_gen_error(task, c, 408, "Read body timeout"); + } } |