summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
committerMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
commit39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch)
tree220ea8605a9396a93fa7a459ff3e558fe1a044aa /src/nxt_router.c
parente1e808bd94609c80b4990939285d47f124bb2eef (diff)
downloadunit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz
unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c242
1 files changed, 191 insertions, 51 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 5beceb38..fa2e957d 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -128,6 +128,8 @@ static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task,
@@ -604,10 +606,34 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
},
{
+ nxt_string("large_header_buffers"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, large_header_buffers),
+ },
+
+ {
+ nxt_string("body_buffer_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, body_buffer_size),
+ },
+
+ {
+ nxt_string("max_body_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, max_body_size),
+ },
+
+ {
nxt_string("header_read_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, header_read_timeout),
},
+
+ {
+ nxt_string("body_read_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_socket_conf_t, body_read_timeout),
+ },
};
@@ -792,7 +818,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
// STUB, default values if http block is not defined.
skcf->header_buffer_size = 2048;
skcf->large_header_buffer_size = 8192;
+ skcf->large_header_buffers = 4;
+ skcf->body_buffer_size = 16 * 1024;
+ skcf->max_body_size = 2 * 1024 * 1024;
skcf->header_read_timeout = 5000;
+ skcf->body_read_timeout = 5000;
if (http != NULL) {
ret = nxt_conf_map_object(http, nxt_router_http_conf,
@@ -1807,7 +1837,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
}
-static const nxt_conn_state_t nxt_router_conn_read_state
+static const nxt_conn_state_t nxt_router_conn_read_header_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_http_header_parse,
@@ -1820,6 +1850,20 @@ static const nxt_conn_state_t nxt_router_conn_read_state
};
+static const nxt_conn_state_t nxt_router_conn_read_body_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_router_conn_http_body_read,
+ .close_handler = nxt_router_conn_close,
+ .error_handler = nxt_router_conn_error,
+
+ .timer_handler = nxt_router_conn_timeout,
+ .timer_value = nxt_router_conn_timeout_value,
+ .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
+ .timer_autoreset = 1,
+};
+
+
static void
nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
{
@@ -1844,7 +1888,7 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
c->read_work_queue = &engine->fast_work_queue;
c->write_work_queue = &engine->fast_work_queue;
- c->read_state = &nxt_router_conn_read_state;
+ c->read_state = &nxt_router_conn_read_header_state;
nxt_conn_read(engine, c);
}
@@ -1873,7 +1917,6 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
if (nxt_slow_path(rc == NULL)) {
-
nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
return;
@@ -1910,14 +1953,18 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rc->app_port = NULL;
}
+
+ rc->conn = NULL;
}
if (b == NULL) {
return;
}
- /* Disable instant buffer completion/re-using by port. */
- msg->buf = NULL;
+ if (msg->buf == b) {
+ /* Disable instant buffer completion/re-using by port. */
+ msg->buf = NULL;
+ }
if (c->write == NULL) {
c->write = b;
@@ -1938,6 +1985,9 @@ nxt_router_text_by_code(int code)
case 400: return "Bad request";
case 404: return "Not found";
case 403: return "Forbidden";
+ case 408: return "Request Timeout";
+ case 411: return "Length Required";
+ case 413: return "Request Entity Too Large";
case 500:
default: return "Internal server error";
}
@@ -1965,6 +2015,7 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
msg = (const char *) b->mem.free;
b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
+ b->mem.free[0] = '\0';
nxt_log_alert(task->log, "error %d: %s", code, msg);
@@ -1996,6 +2047,11 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
va_end(args);
+ if (c->socket.data != NULL) {
+ nxt_mp_free(c->mem_pool, c->socket.data);
+ c->socket.data = NULL;
+ }
+
if (c->write == NULL) {
c->write = b;
c->write_state = &nxt_router_conn_write_state;
@@ -2345,22 +2401,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
- size_t size, preread;
+ size_t size;
nxt_int_t ret;
- nxt_buf_t *b;
+ nxt_buf_t *buf;
nxt_conn_t *c;
nxt_app_parse_ctx_t *ap;
+ nxt_app_request_body_t *b;
nxt_socket_conf_joint_t *joint;
nxt_app_request_header_t *h;
c = obj;
ap = data;
- b = c->read;
+ buf = c->read;
+ joint = c->listen->socket.data;
nxt_debug(task, "router conn http header parse");
if (ap == NULL) {
- ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
+ ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
if (nxt_slow_path(ap == NULL)) {
nxt_router_conn_close(task, c, data);
return;
@@ -2376,78 +2434,157 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
ap->r.remote.start = nxt_sockaddr_address(c->remote);
ap->r.remote.length = c->remote->address_length;
+
+ ap->r.header.buf = buf;
}
h = &ap->r.header;
+ b = &ap->r.body;
- ret = nxt_app_http_req_parse(task, ap, b);
+ ret = nxt_app_http_req_header_parse(task, ap, buf);
- nxt_debug(task, "http parse request: %d", ret);
+ nxt_debug(task, "http parse request header: %d", ret);
switch (nxt_expect(NXT_DONE, ret)) {
case NXT_DONE:
- preread = nxt_buf_mem_used_size(&b->mem);
-
nxt_debug(task, "router request header parsing complete, "
"content length: %O, preread: %uz",
- h->parsed_content_length, preread);
+ h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
- nxt_router_process_http_request(task, c, ap);
- return;
+ if (b->done) {
+ nxt_router_process_http_request(task, c, ap);
+
+ return;
+ }
+
+ if (joint->socket_conf->max_body_size > 0 &&
+ (size_t) h->parsed_content_length >
+ joint->socket_conf->max_body_size) {
+
+ nxt_router_gen_error(task, c, 413, "Content-Length too big");
+ return;
+ }
+
+ if (nxt_buf_mem_free_size(&buf->mem) == 0) {
+ size = nxt_min(joint->socket_conf->body_buffer_size,
+ (size_t) h->parsed_content_length);
+
+ buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(buf->next == NULL)) {
+ nxt_router_gen_error(task, c, 500, "Failed to allocate "
+ "buffer for request body");
+ return;
+ }
+
+ c->read = buf->next;
+
+ b->preread_size += nxt_buf_mem_used_size(&buf->mem);
+ }
+
+ if (b->buf == NULL) {
+ b->buf = c->read;
+ }
+
+ c->read_state = &nxt_router_conn_read_body_state;
+ break;
case NXT_ERROR:
- nxt_router_conn_close(task, c, data);
+ nxt_router_gen_error(task, c, 400, "Request header parse error");
return;
default: /* NXT_AGAIN */
- if (h->done == 0) {
+ if (c->read->mem.free == c->read->mem.end) {
+ size = joint->socket_conf->large_header_buffer_size;
- if (c->read->mem.free == c->read->mem.end) {
- joint = c->listen->socket.data;
- size = joint->socket_conf->large_header_buffer_size;
+ if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) ||
+ ap->r.header.bufs >= joint->socket_conf->large_header_buffers) {
+ nxt_router_gen_error(task, c, 413,
+ "Too long request headers");
+ return;
+ }
- if (size > (size_t) nxt_buf_mem_size(&b->mem)) {
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_conn_close(task, c, data);
- return;
- }
+ buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(buf->next == NULL)) {
+ nxt_router_gen_error(task, c, 500,
+ "Failed to allocate large header "
+ "buffer");
+ return;
+ }
- size = c->read->mem.free - c->read->mem.pos;
+ ap->r.header.bufs++;
- c->read = nxt_buf_cpy(b, c->read->mem.pos, size);
- } else {
- nxt_router_gen_error(task, c, 400,
- "Too long request headers");
- return;
- }
- }
+ size = c->read->mem.free - c->read->mem.pos;
+
+ c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
}
- if (ap->r.body.done == 0) {
+ }
+
+ nxt_conn_read(task->thread->engine, c);
+}
- preread = nxt_buf_mem_used_size(&b->mem);
- if (h->parsed_content_length - preread >
- (size_t) nxt_buf_mem_free_size(&b->mem)) {
+static void
+nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
+{
+ size_t size;
+ nxt_int_t ret;
+ nxt_buf_t *buf;
+ nxt_conn_t *c;
+ nxt_app_parse_ctx_t *ap;
+ nxt_app_request_body_t *b;
+ nxt_socket_conf_joint_t *joint;
+ nxt_app_request_header_t *h;
- b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_gen_error(task, c, 500, "Failed to allocate "
- "buffer for request body");
- return;
- }
+ c = obj;
+ ap = data;
+ buf = c->read;
- c->read = nxt_buf_cpy(b, c->read->mem.pos, preread);
- }
+ nxt_debug(task, "router conn http body read");
- nxt_debug(task, "router request body read again, rest: %uz",
- h->parsed_content_length - preread);
+ nxt_assert(ap != NULL);
+ b = &ap->r.body;
+ h = &ap->r.header;
+
+ ret = nxt_app_http_req_body_read(task, ap, buf);
+
+ nxt_debug(task, "http read request body: %d", ret);
+
+ switch (nxt_expect(NXT_DONE, ret)) {
+
+ case NXT_DONE:
+ nxt_router_process_http_request(task, c, ap);
+ return;
+
+ case NXT_ERROR:
+ nxt_router_gen_error(task, c, 500, "Read body error");
+ return;
+
+ default: /* NXT_AGAIN */
+
+ if (nxt_buf_mem_free_size(&buf->mem) == 0) {
+ joint = c->listen->socket.data;
+
+ b->preread_size += nxt_buf_mem_used_size(&buf->mem);
+
+ size = nxt_min(joint->socket_conf->body_buffer_size,
+ (size_t) h->parsed_content_length - b->preread_size);
+
+ buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(buf->next == NULL)) {
+ nxt_router_gen_error(task, c, 500, "Failed to allocate "
+ "buffer for request body");
+ return;
+ }
+
+ c->read = buf->next;
}
+ nxt_debug(task, "router request body read again, rest: %uz",
+ h->parsed_content_length - b->preread_size);
}
nxt_conn_read(task->thread->engine, c);
@@ -2725,9 +2862,12 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
c = nxt_read_timer_conn(timer);
- c->write_state = &nxt_router_conn_close_state;
+ if (c->read_state == &nxt_router_conn_read_header_state) {
+ nxt_router_gen_error(task, c, 408, "Read header timeout");
- nxt_conn_close(task->thread->engine, c);
+ } else {
+ nxt_router_gen_error(task, c, 408, "Read body timeout");
+ }
}