diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/unit/response.go | 2 | ||||
-rw-r--r-- | src/nxt_application.c | 223 | ||||
-rw-r--r-- | src/nxt_application.h | 18 | ||||
-rw-r--r-- | src/nxt_conn.c | 59 | ||||
-rw-r--r-- | src/nxt_conn.h | 12 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 989 | ||||
-rw-r--r-- | src/nxt_http.h | 181 | ||||
-rw-r--r-- | src/nxt_http_error.c | 105 | ||||
-rw-r--r-- | src/nxt_http_request.c | 384 | ||||
-rw-r--r-- | src/nxt_http_response.c | 83 | ||||
-rw-r--r-- | src/nxt_php_sapi.c | 25 | ||||
-rw-r--r-- | src/nxt_python_wsgi.c | 10 | ||||
-rw-r--r-- | src/nxt_router.c | 760 | ||||
-rw-r--r-- | src/nxt_router.h | 7 | ||||
-rw-r--r-- | src/nxt_sendbuf.c | 20 | ||||
-rw-r--r-- | src/nxt_sendbuf.h | 2 |
16 files changed, 1989 insertions, 891 deletions
diff --git a/src/go/unit/response.go b/src/go/unit/response.go index 258a82c9..44694011 100644 --- a/src/go/unit/response.go +++ b/src/go/unit/response.go @@ -54,7 +54,7 @@ func (r *response) WriteHeader(code int) { return } r.headerSent = true - fmt.Fprintf(r, "%s %d %s\r\n", r.req.Proto, code, http.StatusText(code)) + fmt.Fprintf(r, "Status: %d\r\n", code) // Set a default Content-Type if _, hasType := r.header["Content-Type"]; !hasType { diff --git a/src/nxt_application.c b/src/nxt_application.c index 29b7a1e6..e77a9796 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -10,6 +10,8 @@ #include <nxt_runtime.h> #include <nxt_application.h> #include <nxt_main_process.h> +#include <nxt_router.h> +#include <nxt_http.h> #include <glob.h> @@ -28,22 +30,8 @@ static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, const char *name); static nxt_app_type_t nxt_app_parse_type(u_char *p, size_t length); -static nxt_int_t nxt_app_request_content_length(void *ctx, - nxt_http_field_t *field, uintptr_t data); -static nxt_int_t nxt_app_request_content_type(void *ctx, - nxt_http_field_t *field, uintptr_t data); -static nxt_int_t nxt_app_request_cookie(void *ctx, nxt_http_field_t *field, - uintptr_t data); -static nxt_int_t nxt_app_request_host(void *ctx, nxt_http_field_t *field, - uintptr_t data); - - -static nxt_http_field_proc_t nxt_app_request_fields[] = { - { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 }, - { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 }, - { nxt_string("Cookie"), &nxt_app_request_cookie, 0 }, - { nxt_string("Host"), &nxt_app_request_host, 0 }, -}; + +static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data); static uint32_t compat[] = { @@ -51,8 +39,6 @@ static uint32_t compat[] = { }; -static nxt_lvlhsh_t nxt_app_request_fields_hash; - static nxt_thread_mutex_t nxt_app_mutex; static nxt_thread_cond_t nxt_app_cond; @@ -372,15 +358,6 @@ nxt_app_module_load(nxt_task_t *task, const char *name) } -nxt_int_t -nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt) -{ - return nxt_http_fields_hash(&nxt_app_request_fields_hash, rt->mem_pool, - nxt_app_request_fields, - nxt_nitems(nxt_app_request_fields)); -} - - void nxt_app_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -704,197 +681,29 @@ nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) } -static nxt_int_t -nxt_app_request_content_length(void *ctx, nxt_http_field_t *field, - uintptr_t data) -{ - nxt_app_parse_ctx_t *c; - nxt_app_request_header_t *h; - - c = ctx; - h = &c->r.header; - - h->content_length.length = field->value_length; - h->content_length.start = field->value; - - h->parsed_content_length = nxt_off_t_parse(field->value, - field->value_length); - - return NXT_OK; -} - - -static nxt_int_t -nxt_app_request_content_type(void *ctx, nxt_http_field_t *field, - uintptr_t data) -{ - nxt_app_parse_ctx_t *c; - nxt_app_request_header_t *h; - - c = ctx; - h = &c->r.header; - - h->content_type.length = field->value_length; - h->content_type.start = field->value; - - return NXT_OK; -} - - -static nxt_int_t -nxt_app_request_cookie(void *ctx, nxt_http_field_t *field, uintptr_t data) -{ - nxt_app_parse_ctx_t *c; - nxt_app_request_header_t *h; - - c = ctx; - h = &c->r.header; - - h->cookie.length = field->value_length; - h->cookie.start = field->value; - - return NXT_OK; -} - - -static nxt_int_t -nxt_app_request_host(void *ctx, nxt_http_field_t *field, uintptr_t data) -{ - nxt_app_parse_ctx_t *c; - nxt_app_request_header_t *h; - - c = ctx; - h = &c->r.header; - - h->host.length = field->value_length; - h->host.start = field->value; - - return NXT_OK; -} - - -nxt_app_parse_ctx_t * -nxt_app_http_req_init(nxt_task_t *task) -{ - nxt_mp_t *mp; - nxt_int_t rc; - nxt_app_parse_ctx_t *ctx; - - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NULL; - } - - ctx = nxt_mp_zget(mp, sizeof(nxt_app_parse_ctx_t)); - if (nxt_slow_path(ctx == NULL)) { - nxt_mp_destroy(mp); - return NULL; - } - - ctx->mem_pool = mp; - - rc = nxt_http_parse_request_init(&ctx->parser, mp); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_mp_destroy(mp); - return NULL; - } - - return ctx; -} - - nxt_int_t -nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, - nxt_buf_t *buf) +nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar) { - nxt_int_t rc; - nxt_app_request_body_t *b; - nxt_http_request_parse_t *p; - nxt_app_request_header_t *h; - - p = &ctx->parser; - b = &ctx->r.body; - h = &ctx->r.header; - - nxt_assert(h->done == 0); - - rc = nxt_http_parse_request(p, &buf->mem); - - if (nxt_slow_path(rc != NXT_DONE)) { - return rc; - } - - rc = nxt_http_fields_process(p->fields, &nxt_app_request_fields_hash, ctx); - - if (nxt_slow_path(rc != NXT_OK)) { - return rc; - } - - h->fields = p->fields; - h->done = 1; + ar->timer.handler = nxt_app_http_release; + nxt_timer_add(task->thread->engine, &ar->timer, 0); - h->version.start = p->version.str; - h->version.length = sizeof(p->version.str); - - h->method = p->method; - - h->target.start = p->target_start; - h->target.length = p->target_end - p->target_start; - - h->path = p->path; - h->query = p->args; - - if (h->parsed_content_length == 0) { - b->done = 1; - - } - - if (buf->mem.free == buf->mem.pos) { - return NXT_DONE; - } - - b->buf = buf; - b->done = nxt_buf_mem_used_size(&buf->mem) >= - h->parsed_content_length; - - if (b->done == 1) { - b->preread_size = nxt_buf_mem_used_size(&buf->mem); - } - - return NXT_DONE; + return NXT_OK; } -nxt_int_t -nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, - nxt_buf_t *buf) +static void +nxt_app_http_release(nxt_task_t *task, void *obj, void *data) { - nxt_app_request_body_t *b; - nxt_app_request_header_t *h; - - b = &ctx->r.body; - h = &ctx->r.header; - - nxt_assert(h->done == 1); - nxt_assert(b->done == 0); - - b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >= - (size_t) h->parsed_content_length; + nxt_timer_t *timer; + nxt_app_parse_ctx_t *ar; - if (b->done == 1) { - b->preread_size += nxt_buf_mem_used_size(&buf->mem); - } - - return b->done == 1 ? NXT_DONE : NXT_AGAIN; -} + timer = obj; + nxt_debug(task, "http app release"); -nxt_int_t -nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) -{ - nxt_mp_release(ctx->mem_pool); + ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); - return NXT_OK; + nxt_mp_release(ar->request->mem_pool); } diff --git a/src/nxt_application.h b/src/nxt_application.h index b42d77ff..a9a1d5d3 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -115,28 +115,22 @@ typedef struct { } nxt_app_request_t; -typedef struct nxt_app_parse_ctx_s nxt_app_parse_ctx_t; +typedef struct nxt_http_request_s nxt_http_request_t; + +typedef struct nxt_app_parse_ctx_s nxt_app_parse_ctx_t; struct nxt_app_parse_ctx_s { nxt_app_request_t r; + nxt_http_request_t *request; + nxt_timer_t timer; nxt_http_request_parse_t parser; + nxt_http_request_parse_t resp_parser; nxt_mp_t *mem_pool; }; -nxt_app_parse_ctx_t *nxt_app_http_req_init(nxt_task_t *task); - -nxt_int_t nxt_app_http_req_header_parse(nxt_task_t *task, - nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf); - -nxt_int_t nxt_app_http_req_body_read(nxt_task_t *task, - nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf); - - nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx); -nxt_int_t nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt); - typedef struct nxt_app_wmsg_s nxt_app_wmsg_t; typedef struct nxt_app_rmsg_s nxt_app_rmsg_t; diff --git a/src/nxt_conn.c b/src/nxt_conn.c index 854181ca..6d0182e3 100644 --- a/src/nxt_conn.c +++ b/src/nxt_conn.c @@ -85,8 +85,6 @@ nxt_conn_create(nxt_mp_t *mp, nxt_task_t *task) nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); - nxt_queue_init(&c->requests); - nxt_log_debug(&c->log, "connections: %uD", thr->engine->connections); return c; @@ -165,3 +163,60 @@ nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq) c->read_timer.work_queue = wq; c->write_timer.work_queue = wq; } + + +nxt_sockaddr_t * +nxt_conn_local_addr(nxt_task_t *task, nxt_conn_t *c) +{ + int ret; + size_t size, length; + socklen_t socklen; + nxt_sockaddr_t *sa; + + if (c->local != NULL) { + return c->local; + } + + /* AF_UNIX should not get in here. */ + + switch (c->remote->u.sockaddr.sa_family) { +#if (NXT_INET6) + case AF_INET6: + socklen = sizeof(struct sockaddr_in6); + length = NXT_INET6_ADDR_STR_LEN; + size = offsetof(nxt_sockaddr_t, u) + socklen + length; + break; +#endif + case AF_INET: + default: + socklen = sizeof(struct sockaddr_in); + length = NXT_INET_ADDR_STR_LEN; + size = offsetof(nxt_sockaddr_t, u) + socklen + length; + break; + } + + sa = nxt_mp_get(c->mem_pool, size); + if (nxt_slow_path(sa == NULL)) { + return NULL; + } + + sa->socklen = socklen; + sa->length = length; + + ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen); + if (nxt_slow_path(ret != 0)) { + nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd); + return NULL; + } + + c->local = sa; + + nxt_sockaddr_text(sa); + + /* + * TODO: here we can adjust the end of non-freeable block + * in c->mem_pool to the end of actual sockaddr length. + */ + + return sa; +} diff --git a/src/nxt_conn.h b/src/nxt_conn.h index d2f3db3b..110b1634 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -141,8 +141,6 @@ struct nxt_conn_s { nxt_conn_io_t *io; - nxt_queue_t requests; /* of nxt_req_conn_link_t */ - union { #if (NXT_SSLTLS) void *ssltls; @@ -231,6 +229,8 @@ NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c); NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c, const nxt_conn_state_t *state, nxt_timer_t *tev); NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq); +NXT_EXPORT nxt_sockaddr_t *nxt_conn_local_addr(nxt_task_t *task, + nxt_conn_t *c); void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data); void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data); @@ -289,13 +289,13 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, } while (0) -#define nxt_conn_write(e, c) \ +#define nxt_conn_write(engine, c) \ do { \ - nxt_event_engine_t *engine = e; \ + nxt_event_engine_t *e = engine; \ \ - c->socket.write_work_queue = &engine->write_work_queue; \ + c->socket.write_work_queue = &e->write_work_queue; \ \ - nxt_work_queue_add(&engine->write_work_queue, c->io->write, \ + nxt_work_queue_add(&e->write_work_queue, c->io->write, \ c->socket.task, c, c->socket.data); \ } while (0) diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c new file mode 100644 index 00000000..3d86b5dd --- /dev/null +++ b/src/nxt_h1proto.c @@ -0,0 +1,989 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_router.h> +#include <nxt_http.h> + + +static void nxt_h1p_read_header(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_header_parse(nxt_task_t *task, void *obj, void *data); +static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r); +static void nxt_h1p_body_read(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); +static void nxt_h1p_request_header_send(nxt_task_t *task, + nxt_http_request_t *r); +static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *out); +static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *out); +static void nxt_h1p_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto); +static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, + nxt_conn_t *c); +static void nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_h1p_timeout_value(nxt_conn_t *c, uintptr_t data); + + +static const nxt_conn_state_t nxt_h1p_idle_state; +static const nxt_conn_state_t nxt_h1p_read_header_state; +static const nxt_conn_state_t nxt_h1p_read_body_state; +static const nxt_conn_state_t nxt_h1p_send_state; + + +const nxt_http_proto_body_read_t nxt_http_proto_body_read[3] = { + nxt_h1p_request_body_read, + NULL, + NULL, +}; + + +const nxt_http_proto_local_addr_t nxt_http_proto_local_addr[3] = { + nxt_h1p_request_local_addr, + NULL, + NULL, +}; + + +const nxt_http_proto_header_send_t nxt_http_proto_header_send[3] = { + nxt_h1p_request_header_send, + NULL, + NULL, +}; + + +const nxt_http_proto_send_t nxt_http_proto_send[3] = { + nxt_h1p_request_send, + NULL, + NULL, +}; + + +const nxt_http_proto_close_t nxt_http_proto_close[3] = { + nxt_h1p_request_close, + NULL, + NULL, +}; + + +static nxt_lvlhsh_t nxt_h1p_fields_hash; + +static nxt_http_field_proc_t nxt_h1p_fields[] = { + { nxt_string("Connection"), &nxt_h1p_connection, 0 }, + { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 }, + + { nxt_string("Host"), &nxt_http_request_host, 0 }, + { nxt_string("Cookie"), &nxt_http_request_field, + offsetof(nxt_http_request_t, cookie) }, + { nxt_string("Content-Type"), &nxt_http_request_field, + offsetof(nxt_http_request_t, content_type) }, + { nxt_string("Content-Length"), &nxt_http_request_content_length, 0 }, +}; + + +nxt_int_t +nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt) +{ + return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool, + nxt_h1p_fields, nxt_nitems(nxt_h1p_fields)); +} + + +void +nxt_http_conn_init(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_socket_conf_t *skcf; + nxt_event_engine_t *engine; + nxt_socket_conf_joint_t *joint; + + c = obj; + joint = data; + + nxt_debug(task, "http conn init"); + + c->joint = joint; + joint->count++; + + skcf = joint->socket_conf; + c->local = skcf->sockaddr; + c->socket.data = NULL; + + engine = task->thread->engine; + c->read_work_queue = &engine->fast_work_queue; + c->write_work_queue = &engine->fast_work_queue; + + c->read_state = &nxt_h1p_idle_state; + + nxt_conn_wait(c); +} + + +static const nxt_conn_state_t nxt_h1p_idle_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_read_header, + .close_handler = nxt_h1p_conn_close, + .error_handler = nxt_h1p_conn_error, + + .timer_handler = nxt_h1p_conn_timeout, + .timer_value = nxt_h1p_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, idle_timeout), +}; + + +static void +nxt_h1p_read_header(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_conn_t *c; + nxt_socket_conf_joint_t *joint; + + c = obj; + + nxt_debug(task, "h1p read header"); + + if (c->read == NULL) { + joint = c->joint; + size = joint->socket_conf->header_buffer_size; + + c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(c->read == NULL)) { + nxt_h1p_conn_error(task, c, c->socket.data); + return; + } + } + + c->read_state = &nxt_h1p_read_header_state; + + nxt_conn_read(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_read_header_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_header_parse, + .close_handler = nxt_h1p_conn_close, + .error_handler = nxt_h1p_conn_error, + + .timer_handler = nxt_h1p_conn_timeout, + .timer_value = nxt_h1p_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), +}; + + +static void +nxt_h1p_header_parse(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_int_t ret; + nxt_buf_t *in, *b; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_socket_conf_joint_t *joint; + + c = obj; + h1p = data; + + nxt_debug(task, "h1p header parse"); + + if (h1p == NULL) { + h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t)); + if (nxt_slow_path(h1p == NULL)) { + goto fail; + } + + c->socket.data = h1p; + h1p->conn = c; + } + + r = h1p->request; + + if (r == NULL) { + r = nxt_http_request_create(task); + if (nxt_slow_path(r == NULL)) { + goto fail; + } + + h1p->request = r; + r->proto.h1 = h1p; + joint = c->joint; + r->socket_conf = joint->socket_conf; + + r->remote = c->remote; + + ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); + if (nxt_slow_path(ret != NXT_OK)) { + /* + * The request is very uncomplete here, + * so "internal server error" useless here. + */ + nxt_mp_release(r->mem_pool); + h1p->request = NULL; + goto fail; + } + } + + ret = nxt_http_parse_request(&h1p->parser, &c->read->mem); + + if (nxt_fast_path(ret == NXT_DONE)) { + r->target.start = h1p->parser.target_start; + r->target.length = h1p->parser.target_end - h1p->parser.target_start; + + r->version.start = h1p->parser.version.str; + r->version.length = sizeof(h1p->parser.version.str); + + r->method = &h1p->parser.method; + r->path = &h1p->parser.path; + r->args = &h1p->parser.args; + + /* + * By default the keepalive mode is disabled in HTTP/1.0 and + * enabled in HTTP/1.1. The mode can be overridden later by + * the "Connection" field processed in nxt_h1p_connection(). + */ + h1p->keepalive = (h1p->parser.version.str[7] != '0'); + + r->fields = h1p->parser.fields; + + ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r); + + if (nxt_fast_path(ret == NXT_OK)) { + r->state->ready_handler(task, r, NULL); + return; + } + + } else if (ret == NXT_AGAIN) { + in = c->read; + + if (nxt_buf_mem_free_size(&in->mem) == 0) { + size = r->socket_conf->large_header_buffer_size; + + if (size <= (size_t) nxt_buf_mem_used_size(&in->mem) + || h1p->nbuffers >= r->socket_conf->large_header_buffers) + { + nxt_http_request_error(task, r, + NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE); + return; + } + + b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return; + } + + h1p->nbuffers++; + + size = nxt_buf_mem_used_size(&in->mem); + b->mem.free = nxt_cpymem(b->mem.pos, in->mem.pos, size); + + in->next = b; + nxt_buf_chain_add(&h1p->buffers, in); + c->read = b; + } + + nxt_conn_read(task->thread->engine, c); + return; + } + + /* ret == NXT_ERROR */ + + nxt_http_request_error(task, r, NXT_HTTP_BAD_REQUEST); + return; + +fail: + + nxt_h1p_conn_close(task, c, h1p); +} + + +static nxt_int_t +nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { + r->proto.h1->keepalive = 0; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_te_t te; + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 7 + && nxt_memcmp(field->value, "chunked", 7) == 0) + { + te = NXT_HTTP_TE_CHUNKED; + + } else { + te = NXT_HTTP_TE_UNSUPPORTED; + } + + r->proto.h1->transfer_encoding = te; + + return NXT_OK; +} + + +static void +nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) +{ + size_t size, rest_length; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_http_status_t status; + + nxt_debug(task, "h1p body read %O te:%d", + r->content_length_n, r->proto.h1->transfer_encoding); + + switch (r->proto.h1->transfer_encoding) { + + case NXT_HTTP_TE_CHUNKED: + status = NXT_HTTP_LENGTH_REQUIRED; + goto error; + + case NXT_HTTP_TE_UNSUPPORTED: + status = NXT_HTTP_NOT_IMPLEMENTED; + goto error; + + default: + case NXT_HTTP_TE_NONE: + break; + } + + if (r->content_length_n == -1 || r->content_length_n == 0) { + goto ready; + } + + if (r->content_length_n > (nxt_off_t) r->socket_conf->max_body_size) { + status = NXT_HTTP_PAYLOAD_TOO_LARGE; + goto error; + } + + rest_length = (size_t) r->content_length_n; + + b = r->body; + + if (b == NULL) { + b = nxt_buf_mem_alloc(r->mem_pool, rest_length, 0); + if (nxt_slow_path(b == NULL)) { + status = NXT_HTTP_INTERNAL_SERVER_ERROR; + goto error; + } + + r->body = b; + } + + c = r->proto.h1->conn; + + size = nxt_buf_mem_used_size(&c->read->mem); + + if (size != 0) { + if (size >= rest_length) { + size = rest_length; + rest_length = 0; + + } else { + rest_length -= size; + } + + b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, size); + c->read->mem.pos += size; + } + + nxt_debug(task, "h1p body rest: %O", rest_length); + + r->rest_length = rest_length; + + if (rest_length != 0) { + c->read = b; + c->read_state = &nxt_h1p_read_body_state; + + nxt_conn_read(task->thread->engine, c); + return; + } + +ready: + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + r->state->ready_handler, task, r, NULL); + + return; + +error: + + r->proto.h1->keepalive = 0; + + nxt_http_request_error(task, r, status); +} + + +static const nxt_conn_state_t nxt_h1p_read_body_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_body_read, + .close_handler = nxt_h1p_conn_close, + .error_handler = nxt_h1p_conn_error, + + .timer_handler = nxt_h1p_conn_timeout, + .timer_value = nxt_h1p_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_body_read(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + + c = obj; + h1p = data; + + nxt_debug(task, "h1p body read"); + + r = h1p->request; + size = nxt_buf_mem_used_size(&c->read->mem); + + c->read->mem.pos += size; + r->rest_length -= size; + + nxt_debug(task, "h1p body rest: %O", r->rest_length); + + if (r->rest_length != 0) { + nxt_conn_read(task->thread->engine, c); + + } else { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + r->state->ready_handler, task, r, NULL); + } +} + + +static void +nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r) +{ + r->local = nxt_conn_local_addr(task, r->proto.h1->conn); +} + + +#define NXT_HTTP_LAST_SUCCESS \ + (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1) + +static const nxt_str_t nxt_http_success[] = { + nxt_string("HTTP/1.1 200 OK\r\n"), + nxt_string("HTTP/1.1 201 Created\r\n"), + nxt_string("HTTP/1.1 202 Accepted\r\n"), + nxt_string("HTTP/1.1 203 Non-Authoritative Information\r\n"), + nxt_string("HTTP/1.1 204 No Content\r\n"), + nxt_string("HTTP/1.1 205 Reset Content\r\n"), + nxt_string("HTTP/1.1 206 Partial Content\r\n"), +}; + + +#define NXT_HTTP_LAST_REDIRECTION \ + (NXT_HTTP_MULTIPLE_CHOICES + nxt_nitems(nxt_http_redirection) - 1) + +static const nxt_str_t nxt_http_redirection[] = { + nxt_string("HTTP/1.1 300 Multiple Choices\r\n"), + nxt_string("HTTP/1.1 301 Moved Permanently\r\n"), + nxt_string("HTTP/1.1 302 Found\r\n"), + nxt_string("HTTP/1.1 303 See Other\r\n"), + nxt_string("HTTP/1.1 304 Not Modified\r\n"), +}; + + +#define NXT_HTTP_LAST_CLIENT_ERROR \ + (NXT_HTTP_BAD_REQUEST + nxt_nitems(nxt_http_client_error) - 1) + +static const nxt_str_t nxt_http_client_error[] = { + nxt_string("HTTP/1.1 400 Bad Request\r\n"), + nxt_string("HTTP/1.1 401 Unauthorized\r\n"), + nxt_string("HTTP/1.1 402 Payment Required\r\n"), + nxt_string("HTTP/1.1 403 Forbidden\r\n"), + nxt_string("HTTP/1.1 404 Not Found\r\n"), + nxt_string("HTTP/1.1 405 Method Not Allowed\r\n"), + nxt_string("HTTP/1.1 406 Not Acceptable\r\n"), + nxt_string("HTTP/1.1 407 Proxy Authentication Required\r\n"), + nxt_string("HTTP/1.1 408 Request Timeout\r\n"), + nxt_string("HTTP/1.1 409 Conflict\r\n"), + nxt_string("HTTP/1.1 410 Gone\r\n"), + nxt_string("HTTP/1.1 411 Length Required\r\n"), + nxt_string("HTTP/1.1 412 Precondition Failed\r\n"), + nxt_string("HTTP/1.1 413 Payload Too Large\r\n"), + nxt_string("HTTP/1.1 414 URI Too Long\r\n"), + nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"), + nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"), + nxt_string("HTTP/1.1 417 Expectation Failed\r\n"), + nxt_string("HTTP/1.1 418\r\n"), + nxt_string("HTTP/1.1 419\r\n"), + nxt_string("HTTP/1.1 420\r\n"), + nxt_string("HTTP/1.1 421\r\n"), + nxt_string("HTTP/1.1 422\r\n"), + nxt_string("HTTP/1.1 423\r\n"), + nxt_string("HTTP/1.1 424\r\n"), + nxt_string("HTTP/1.1 425\r\n"), + nxt_string("HTTP/1.1 426\r\n"), + nxt_string("HTTP/1.1 427\r\n"), + nxt_string("HTTP/1.1 428\r\n"), + nxt_string("HTTP/1.1 429\r\n"), + nxt_string("HTTP/1.1 430\r\n"), + nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"), +}; + + +#define NXT_HTTP_LAST_SERVER_ERROR \ + (NXT_HTTP_INTERNAL_SERVER_ERROR + nxt_nitems(nxt_http_server_error) - 1) + +static const nxt_str_t nxt_http_server_error[] = { + nxt_string("HTTP/1.1 500 Internal Server Error\r\n"), + nxt_string("HTTP/1.1 501 Not Implemented\r\n"), + nxt_string("HTTP/1.1 502 Bad Gateway\r\n"), + nxt_string("HTTP/1.1 503 Service Unavailable\r\n"), + nxt_string("HTTP/1.1 504 Gateway Timeout\r\n"), +}; + + +#define UNKNOWN_STATUS_LENGTH (sizeof("HTTP/1.1 65536\r\n") - 1) + +static void +nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) +{ + u_char *p; + size_t size; + nxt_buf_t *header; + nxt_str_t unknown_status; + nxt_int_t conn; + nxt_uint_t n; + nxt_bool_t http11; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + const nxt_str_t *status; + nxt_http_field_t *field; + nxt_event_engine_t *engine; + u_char buf[UNKNOWN_STATUS_LENGTH]; + + static const char chunked[] = "Transfer-Encoding: chunked\r\n"; + + static const nxt_str_t connection[2] = { + nxt_string("Connection: close\r\n"), + nxt_string("Connection: keep-alive\r\n"), + }; + + nxt_debug(task, "h1p request header send"); + + r->header_sent = 1; + h1p = r->proto.h1; + n = r->status; + + if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) { + status = &nxt_http_success[n - NXT_HTTP_OK]; + + } else if (n >= NXT_HTTP_MULTIPLE_CHOICES + && n <= NXT_HTTP_LAST_REDIRECTION) + { + status = &nxt_http_redirection[n - NXT_HTTP_MULTIPLE_CHOICES]; + + } else if (n >= NXT_HTTP_BAD_REQUEST && n <= NXT_HTTP_LAST_CLIENT_ERROR) { + status = &nxt_http_client_error[n - NXT_HTTP_BAD_REQUEST]; + + } else if (n >= NXT_HTTP_INTERNAL_SERVER_ERROR + && n <= NXT_HTTP_LAST_SERVER_ERROR) + { + status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR]; + + } else { + p = nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH, + "HTTP/1.1 %03d\r\n", n); + + unknown_status.length = p - buf; + unknown_status.start = buf; + status = &unknown_status; + } + + size = status->length + sizeof("\r\n"); + size += sizeof("\r\n"); /* Trailing CRLF. */ + + http11 = (h1p->parser.version.str[7] != '0'); + + if (r->resp.content_length == NULL || r->resp.content_length->skip) { + if (http11) { + h1p->chunked = 1; + size += sizeof(chunked) - 1; + + } else { + h1p->keepalive = 0; + } + } + + conn = -1; + + if (http11 ^ h1p->keepalive) { + conn = h1p->keepalive; + size += connection[conn].length; + } + + nxt_list_each(field, r->resp.fields) { + + if (!field->skip) { + size += field->name_length + field->value_length; + size += sizeof(": \r\n") - 1; + } + + } nxt_list_loop; + + header = nxt_buf_mem_alloc(r->mem_pool, size, 0); + if (nxt_slow_path(header == NULL)) { + /* The internal server error is set just for logging. */ + r->status = NXT_HTTP_INTERNAL_SERVER_ERROR; + nxt_h1p_conn_close(task, h1p->conn, h1p); + return; + } + + p = header->mem.free; + + p = nxt_cpymem(p, status->start, status->length); + + nxt_list_each(field, r->resp.fields) { + + if (!field->skip) { + p = nxt_cpymem(p, field->name, field->name_length); + *p++ = ':'; *p++ = ' '; + p = nxt_cpymem(p, field->value, field->value_length); + *p++ = '\r'; *p++ = '\n'; + } + + } nxt_list_loop; + + if (conn >= 0) { + p = nxt_cpymem(p, connection[conn].start, connection[conn].length); + } + + if (h1p->chunked) { + p = nxt_cpymem(p, chunked, sizeof(chunked) - 1); + + } else { + *p++ = '\r'; *p++ = '\n'; + } + + header->mem.free = p; + + c = h1p->conn; + + c->write = header; + c->write_state = &nxt_h1p_send_state; + + engine = task->thread->engine; + + nxt_work_queue_add(&engine->fast_work_queue, r->state->ready_handler, + task, r, NULL); + + nxt_conn_write(engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_sent, + .close_handler = nxt_h1p_conn_close, + .error_handler = nxt_h1p_conn_error, + + .timer_handler = nxt_h1p_conn_timeout, + .timer_value = nxt_h1p_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, send_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p request send"); + + c = r->proto.h1->conn; + + if (r->proto.h1->chunked) { + out = nxt_h1p_chunk_create(task, r, out); + if (nxt_slow_path(out == NULL)) { + nxt_h1p_conn_error(task, c, c->socket.data); + return; + } + } + + if (c->write == NULL) { + c->write = out; + c->write_state = &nxt_h1p_send_state; + + nxt_conn_write(task->thread->engine, c); + + } else { + nxt_buf_chain_add(&c->write, out); + } +} + + +static nxt_buf_t * +nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) +{ + size_t size; + nxt_buf_t *b, **prev, *header, *tail; + + const size_t chunk_size = 2 * (sizeof("\r\n") - 1) + NXT_OFF_T_HEXLEN; + static const char tail_chunk[] = "\r\n0\r\n\r\n"; + + size = 0; + prev = &out; + + for (b = out; b != NULL; b = b->next) { + + if (nxt_buf_is_last(b)) { + tail = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0); + if (nxt_slow_path(tail == NULL)) { + return NULL; + } + + *prev = tail; + tail->next = b; + /* + * The tail_chunk size with trailing zero is 8 bytes, so + * memcpy may be inlined with just single 8 byte move operation. + */ + nxt_memcpy(tail->mem.free, tail_chunk, sizeof(tail_chunk)); + tail->mem.free += sizeof(tail_chunk) - 1; + + break; + } + + size += nxt_buf_used_size(b); + prev = &b->next; + } + + if (size == 0) { + return out; + } + + header = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0); + if (nxt_slow_path(header == NULL)) { + return NULL; + } + + header->next = out; + header->mem.free = nxt_sprintf(header->mem.free, header->mem.end, + "\r\n%xO\r\n", size); + return header; +} + + +static void +nxt_h1p_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_event_engine_t *engine; + + c = obj; + + nxt_debug(task, "h1p sent"); + + engine = task->thread->engine; + + c->write = nxt_sendbuf_completion0(task, &engine->fast_work_queue, + c->write); + if (c->write != NULL) { + nxt_conn_write(engine, c); + } +} + + +static void +nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto) +{ + nxt_conn_t *c; + nxt_h1proto_t *h1p; + + nxt_debug(task, "h1p request close"); + + h1p = proto.h1; + h1p->request = NULL; + + c = h1p->conn; + + if (h1p->keepalive) { + nxt_h1p_keepalive(task, h1p, c); + + } else { + nxt_h1p_close(task, c); + } +} + + +static void +nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c) +{ + size_t size; + nxt_buf_t *in, *b, *next; + + nxt_debug(task, "h1p keepalive"); + + b = h1p->buffers; + + nxt_memzero(&h1p->parser, sizeof(nxt_h1proto_t)); + + h1p->conn = c; + in = c->read; + + size = nxt_buf_mem_used_size(&in->mem); + + if (size == 0) { + if (b != NULL) { + in = b; + c->read = in; + + for (b = b->next; b != NULL; b = next) { + next = b->next; + nxt_mp_free(c->mem_pool, b); + } + + in->next = NULL; + } + + in->mem.pos = in->mem.start; + in->mem.free = in->mem.start; + + if (c->socket.read_ready) { + c->read_state = &nxt_h1p_read_header_state; + nxt_conn_read(task->thread->engine, c); + + } else { + c->read_state = &nxt_h1p_idle_state; + nxt_conn_wait(c); + } + + } else { + nxt_debug(task, "h1p pipelining"); + + if (b != NULL) { + do { + next = b->next; + nxt_mp_free(c->mem_pool, b); + b = next; + } while (b != in); + } + + nxt_memmove(in->mem.start, in->mem.pos, size); + + in->mem.pos = in->mem.start; + in->mem.free = in->mem.start + size; + + nxt_h1p_header_parse(task, c, c->socket.data); + } +} + + +static void +nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c) +{ + nxt_debug(task, "h1p close"); + + c->socket.data = NULL; + + if (c->socket.fd != -1) { + c->write_state = &nxt_router_conn_close_state; + + nxt_conn_close(task->thread->engine, c); + } +} + + +static void +nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + + c = obj; + h1p = data; + + nxt_debug(task, "h1p conn close"); + + if (h1p != NULL) { + r = h1p->request; + + if (r != NULL) { + r->state->error_handler(task, r, r->proto.h1); + return; + } + } + + nxt_h1p_close(task, c); +} + + +static void +nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_h1proto_t *h1p; + + c = obj; + h1p = data; + + nxt_debug(task, "h1p conn error"); + + nxt_h1p_conn_close(task, c, h1p); +} + + +static void +nxt_h1p_conn_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + nxt_debug(task, "h1p conn timeout"); + + c = nxt_read_timer_conn(timer); + + nxt_h1p_conn_close(task, c, c->socket.data); +} + + +static nxt_msec_t +nxt_h1p_timeout_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_socket_conf_joint_t *joint; + + joint = c->joint; + + return nxt_value_at(nxt_msec_t, joint->socket_conf, data); +} diff --git a/src/nxt_http.h b/src/nxt_http.h new file mode 100644 index 00000000..c6704d68 --- /dev/null +++ b/src/nxt_http.h @@ -0,0 +1,181 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_HTTP_H_INCLUDED_ +#define _NXT_HTTP_H_INCLUDED_ + + +typedef enum { + NXT_HTTP_INVALID = 0, + + NXT_HTTP_OK = 200, + + NXT_HTTP_MULTIPLE_CHOICES = 300, + NXT_HTTP_MOVED_PERMANENTLY = 301, + NXT_HTTP_FOUND = 302, + NXT_HTTP_SEE_OTHER = 303, + NXT_HTTP_NOT_MODIFIED = 304, + + NXT_HTTP_BAD_REQUEST = 400, + NXT_HTTP_LENGTH_REQUIRED = 411, + NXT_HTTP_PAYLOAD_TOO_LARGE = 413, + NXT_HTTP_URI_TOO_LONG = 414, + NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE = 431, + + NXT_HTTP_INTERNAL_SERVER_ERROR = 500, + NXT_HTTP_NOT_IMPLEMENTED = 501, + NXT_HTTP_BAD_GATEWAY = 502, + NXT_HTTP_SERVICE_UNAVAILABLE = 503, + NXT_HTTP_GATEWAY_TIMEOUT = 504, +} nxt_http_status_t; + + +typedef enum { + NXT_HTTP_TE_NONE = 0, + NXT_HTTP_TE_CHUNKED = 1, + NXT_HTTP_TE_UNSUPPORTED = 2, +} nxt_http_te_t; + + +typedef struct { + nxt_work_handler_t ready_handler; + nxt_work_handler_t error_handler; +} nxt_http_request_state_t; + + +typedef struct nxt_http_request_s nxt_http_request_t; + + +typedef struct { + nxt_http_request_parse_t parser; + + uint8_t nbuffers; + uint8_t keepalive; /* 1 bit */ + uint8_t chunked; /* 1 bit */ + nxt_http_te_t transfer_encoding:8; /* 2 bits */ + + nxt_http_request_t *request; + nxt_conn_t *conn; + nxt_buf_t *buffers; +} nxt_h1proto_t; + + +typedef union { + void *any; + nxt_h1proto_t *h1; +} nxt_http_proto_t; + + +#define nxt_http_field_name_set(_field, _name) \ + do { \ + (_field)->name_length = sizeof(_name) - 1; \ + (_field)->name = (u_char *) _name; \ + } while (0) + + +#define nxt_http_field_set(_field, _name, _value) \ + do { \ + (_field)->name_length = sizeof(_name) - 1; \ + (_field)->value_length = sizeof(_value) - 1; \ + (_field)->name = (u_char *) _name; \ + (_field)->value = (u_char *) _value; \ + } while (0) + + +typedef struct { + nxt_list_t *fields; + nxt_http_field_t *content_type; + nxt_http_field_t *content_length; + nxt_off_t content_length_n; +} nxt_http_response_t; + + +struct nxt_http_request_s { + nxt_http_proto_t proto; + nxt_socket_conf_t *socket_conf; + + nxt_mp_t *mem_pool; + + nxt_buf_t *body; + nxt_buf_t *out; + const nxt_http_request_state_t *state; + + nxt_str_t target; + nxt_str_t version; + nxt_str_t *method; + nxt_str_t *path; + nxt_str_t *args; + + nxt_list_t *fields; + nxt_http_field_t *host; + nxt_http_field_t *content_type; + nxt_http_field_t *content_length; + nxt_http_field_t *cookie; + nxt_off_t content_length_n; + nxt_off_t rest_length; + + nxt_sockaddr_t *remote; + nxt_sockaddr_t *local; + + nxt_http_response_t resp; + + nxt_http_status_t status:16; + + uint8_t protocol; /* 2 bits */ + uint8_t logged; /* 1 bit */ + uint8_t header_sent; /* 1 bit */ +}; + + +typedef void (*nxt_http_proto_body_read_t)(nxt_task_t *task, + nxt_http_request_t *r); +typedef void (*nxt_http_proto_local_addr_t)(nxt_task_t *task, + nxt_http_request_t *r); +typedef void (*nxt_http_proto_header_send_t)(nxt_task_t *task, + nxt_http_request_t *r); +typedef void (*nxt_http_proto_send_t)(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *out); +typedef void (*nxt_http_proto_close_t)(nxt_task_t *task, + nxt_http_proto_t proto); + + +nxt_int_t nxt_http_init(nxt_task_t *task, nxt_runtime_t *rt); +nxt_int_t nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt); +nxt_int_t nxt_http_response_hash_init(nxt_task_t *task, nxt_runtime_t *rt); + +void nxt_http_conn_init(nxt_task_t *task, void *obj, void *data); +nxt_http_request_t *nxt_http_request_create(nxt_task_t *task); +void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_status_t status); +void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r); +void nxt_http_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); +void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r); +void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *out); +void nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r); +nxt_buf_t *nxt_http_request_last_buffer(nxt_task_t *task, + nxt_http_request_t *r); +void nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data); + +nxt_int_t nxt_http_request_host(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_int_t nxt_http_request_field(void *ctx, nxt_http_field_t *field, + uintptr_t offset); +nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data); + + +extern nxt_lvlhsh_t nxt_response_fields_hash; +extern const nxt_conn_state_t nxt_router_conn_close_state; + +extern const nxt_http_proto_body_read_t nxt_http_proto_body_read[]; +extern const nxt_http_proto_local_addr_t nxt_http_proto_local_addr[]; +extern const nxt_http_proto_header_send_t nxt_http_proto_header_send[]; +extern const nxt_http_proto_send_t nxt_http_proto_send[]; +extern const nxt_http_proto_close_t nxt_http_proto_close[]; + + +#endif /* _NXT_HTTP_H_INCLUDED_ */ diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c new file mode 100644 index 00000000..5030264b --- /dev/null +++ b/src/nxt_http_error.c @@ -0,0 +1,105 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_router.h> +#include <nxt_http.h> + + +static void nxt_http_request_send_error_body(nxt_task_t *task, void *r, + void *data); + + +static const nxt_http_request_state_t nxt_http_request_send_state; + + +static const char error[] = + "<html><head><title>Error</title><head>" + "<body>Error.</body></html>\r\n"; + + +void +nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_status_t status) +{ + nxt_http_field_t *content_type; + + nxt_debug(task, "http request error: %d", status); + + if (r->header_sent) { + goto fail; + } + + r->status = status; + + r->resp.fields = nxt_list_create(r->mem_pool, 8, sizeof(nxt_http_field_t)); + if (nxt_slow_path(r == NULL)) { + goto fail; + } + + content_type = nxt_list_zero_add(r->resp.fields); + if (nxt_slow_path(content_type == NULL)) { + goto fail; + } + + nxt_http_field_set(content_type, "Content-Type", "text/html"); + + r->resp.content_length = NULL; + r->resp.content_length_n = sizeof(error) - 1; + + r->state = &nxt_http_request_send_state; + + nxt_http_request_header_send(task, r); + return; + +fail: + + nxt_http_request_release(task, r); +} + + +static const nxt_http_request_state_t nxt_http_request_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_request_send_error_body, + .error_handler = nxt_http_request_close_handler, +}; + + +static void +nxt_http_request_send_error_body(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out, *last; + nxt_http_request_t *r; + + r = obj; + + nxt_debug(task, "http request send error body"); + + out = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + if (nxt_slow_path(out == NULL)) { + goto fail; + } + + out->mem.start = (u_char *) error; + out->mem.pos = out->mem.start; + out->mem.free = out->mem.start + sizeof(error) - 1; + out->mem.end = out->mem.free; + + last = nxt_http_request_last_buffer(task, r); + if (nxt_slow_path(last == NULL)) { + goto fail; + } + + out->next = last; + + nxt_http_request_send(task, r, out); + + return; + +fail: + // TODO + nxt_http_request_release(task, r); +} diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c new file mode 100644 index 00000000..eab61814 --- /dev/null +++ b/src/nxt_http_request.c @@ -0,0 +1,384 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_router.h> +#include <nxt_http.h> + + +static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data); +static void nxt_http_app_request(nxt_task_t *task, void *obj, void *data); +static void nxt_http_request_done(nxt_task_t *task, void *obj, void *data); + + +static const nxt_http_request_state_t nxt_http_request_init_state; +static const nxt_http_request_state_t nxt_http_request_body_state; + + +nxt_int_t +nxt_http_init(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_int_t ret; + + ret = nxt_h1p_init(task, rt); + + if (ret != NXT_OK) { + return ret; + } + + return nxt_http_response_hash_init(task, rt); +} + + +nxt_int_t +nxt_http_request_host(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + /* TODO: validate host. */ + + r->host = field; + + return NXT_OK; +} + + +nxt_int_t +nxt_http_request_field(void *ctx, nxt_http_field_t *field, uintptr_t offset) +{ + nxt_http_request_t *r; + + r = ctx; + + nxt_value_at(nxt_http_field_t *, r, offset) = field; + + return NXT_OK; +} + + +nxt_int_t +nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + r->content_length = field; + r->content_length_n = nxt_off_t_parse(field->value, field->value_length); + + return NXT_OK; +} + + +nxt_http_request_t * +nxt_http_request_create(nxt_task_t *task) +{ + nxt_mp_t *mp; + nxt_http_request_t *r; + + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + r = nxt_mp_zget(mp, sizeof(nxt_http_request_t)); + if (nxt_slow_path(r == NULL)) { + goto fail; + } + + r->resp.fields = nxt_list_create(mp, 8, sizeof(nxt_http_field_t)); + if (nxt_slow_path(r == NULL)) { + goto fail; + } + + r->mem_pool = mp; + r->content_length_n = -1; + r->resp.content_length_n = -1; + r->state = &nxt_http_request_init_state; + + return r; + +fail: + + nxt_mp_release(mp); + return NULL; +} + + +static const nxt_http_request_state_t nxt_http_request_init_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_request_start, + .error_handler = nxt_http_request_close_handler, +}; + + +static void +nxt_http_request_start(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = obj; + + r->state = &nxt_http_request_body_state; + + nxt_http_request_read_body(task, r); +} + + +static const nxt_http_request_state_t nxt_http_request_body_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_app_request, + .error_handler = nxt_http_request_close_handler, +}; + + +static void +nxt_http_app_request(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t ret; + nxt_event_engine_t *engine; + nxt_http_request_t *r; + nxt_app_parse_ctx_t *ar; + + r = obj; + + ar = nxt_mp_zget(r->mem_pool, sizeof(nxt_app_parse_ctx_t)); + if (nxt_slow_path(ar == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return; + } + + ar->request = r; + ar->mem_pool = r->mem_pool; + nxt_mp_retain(r->mem_pool); + + // STUB + engine = task->thread->engine; + ar->timer.task = &engine->task; + ar->timer.work_queue = &engine->fast_work_queue; + ar->timer.log = engine->task.log; + ar->timer.precision = NXT_TIMER_DEFAULT_PRECISION; + + ar->r.remote.start = nxt_sockaddr_address(r->remote); + ar->r.remote.length = r->remote->address_length; + + /* + * TODO: need an application flag to get local address + * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go. + */ + nxt_http_request_local_addr(task, r); + + if (nxt_fast_path(r->local != NULL)) { + ar->r.local.start = nxt_sockaddr_address(r->local); + ar->r.local.length = r->local->address_length; + } + + ar->r.header.fields = r->fields; + ar->r.header.done = 1; + ar->r.header.version = r->version; + + if (r->method != NULL) { + ar->r.header.method = *r->method; + } + + ar->r.header.target = r->target; + + if (r->path != NULL) { + ar->r.header.path = *r->path; + } + + if (r->args != NULL) { + ar->r.header.query = *r->args; + } + + if (r->host != NULL) { + ar->r.header.host.length = r->host->value_length; + ar->r.header.host.start = r->host->value; + } + + if (r->content_type != NULL) { + ar->r.header.content_type.length = r->content_type->value_length; + ar->r.header.content_type.start = r->content_type->value; + } + + if (r->content_length != NULL) { + ar->r.header.content_length.length = r->content_length->value_length; + ar->r.header.content_length.start = r->content_length->value; + } + + if (r->cookie != NULL) { + ar->r.header.cookie.length = r->cookie->value_length; + ar->r.header.cookie.start = r->cookie->value; + } + + ar->r.body.done = 1; + + ret = nxt_http_parse_request_init(&ar->resp_parser, r->mem_pool); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return; + } + + nxt_router_process_http_request(task, ar); +} + + +void +nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r) +{ + if (r->proto.any != NULL) { + nxt_http_proto_body_read[r->protocol](task, r); + } +} + + +void +nxt_http_request_local_addr(nxt_task_t *task, nxt_http_request_t *r) +{ + if (r->proto.any != NULL) { + nxt_http_proto_local_addr[r->protocol](task, r); + } +} + + +void +nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r) +{ + u_char *p, *end; + nxt_http_field_t *server, *content_length; + + /* + * TODO: "Server" and "Content-Length" processing should be moved + * to the last header filter. + */ + + server = nxt_list_zero_add(r->resp.fields); + if (nxt_slow_path(server == NULL)) { + goto fail; + } + + nxt_http_field_set(server, "Server", "unit/" NXT_VERSION); + + if (r->resp.content_length_n != -1 + && (r->resp.content_length == NULL || r->resp.content_length->skip)) + { + content_length = nxt_list_zero_add(r->resp.fields); + if (nxt_slow_path(content_length == NULL)) { + goto fail; + } + + nxt_http_field_name_set(content_length, "Content-Length"); + + p = nxt_mp_nget(r->mem_pool, NXT_OFF_T_LEN); + if (nxt_slow_path(p == NULL)) { + goto fail; + } + + content_length->value = p; + end = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", r->resp.content_length_n); + content_length->value_length = end - p; + + r->resp.content_length = content_length; + } + + if (r->proto.any != NULL) { + nxt_http_proto_header_send[r->protocol](task, r); + } + + return; + +fail: + + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); +} + + +void +nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) +{ + if (r->proto.any != NULL) { + nxt_http_proto_send[r->protocol](task, r, out); + } +} + + +nxt_buf_t * +nxt_http_request_last_buffer(nxt_task_t *task, nxt_http_request_t *r) +{ + nxt_buf_t *b; + + b = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + + if (nxt_fast_path(b != NULL)) { + nxt_buf_set_sync(b); + nxt_buf_set_last(b); + b->completion_handler = nxt_http_request_done; + b->parent = r; + + } else { + nxt_http_request_release(task, r); + } + + return b; +} + + +static void +nxt_http_request_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = data; + + nxt_debug(task, "http request done"); + + nxt_http_request_close_handler(task, r, r->proto.any); +} + + +void +nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r) +{ + nxt_debug(task, "http request release"); + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_http_request_close_handler, task, r, r->proto.any); +} + + +void +nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_proto_t proto; + nxt_http_request_t *r; + nxt_http_proto_close_t handler; + + r = obj; + proto.any = data; + + nxt_debug(task, "http request close handler"); + + if (!r->logged) { + r->logged = 1; + // STUB + nxt_debug(task, "http request log: \"%*s \"%V %V %V\" %d\"", + r->remote->address_length, nxt_sockaddr_address(r->remote), + r->method, &r->target, &r->version, r->status); + } + + handler = nxt_http_proto_close[r->protocol]; + + r->proto.any = NULL; + nxt_mp_release(r->mem_pool); + + if (proto.any != NULL) { + handler(task, proto); + } +} diff --git a/src/nxt_http_response.c b/src/nxt_http_response.c new file mode 100644 index 00000000..330890c1 --- /dev/null +++ b/src/nxt_http_response.c @@ -0,0 +1,83 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_router.h> +#include <nxt_http.h> + + +static nxt_int_t nxt_http_response_status(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_http_response_skip(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_http_response_field(void *ctx, nxt_http_field_t *field, + uintptr_t offset); + + +nxt_lvlhsh_t nxt_response_fields_hash; + +static nxt_http_field_proc_t nxt_response_fields[] = { + { nxt_string("Status"), &nxt_http_response_status, 0 }, + { nxt_string("Server"), &nxt_http_response_skip, 0 }, + { nxt_string("Connection"), &nxt_http_response_skip, 0 }, + { nxt_string("Content-Type"), &nxt_http_response_field, + offsetof(nxt_http_request_t, resp.content_type) }, + { nxt_string("Content-Length"), &nxt_http_response_field, + offsetof(nxt_http_request_t, resp.content_length) }, +}; + + +nxt_int_t +nxt_http_response_hash_init(nxt_task_t *task, nxt_runtime_t *rt) +{ + return nxt_http_fields_hash(&nxt_response_fields_hash, rt->mem_pool, + nxt_response_fields, nxt_nitems(nxt_response_fields)); +} + + +nxt_int_t +nxt_http_response_status(void *ctx, nxt_http_field_t *field, + uintptr_t data) +{ + nxt_int_t status; + nxt_http_request_t *r; + + r = ctx; + + field->skip = 1; + + if (field->value_length >= 3) { + status = nxt_int_parse(field->value, 3); + + if (status >= 100 && status <= 999) { + r->status = status; + return NXT_OK; + } + } + + return NXT_ERROR; +} + + +nxt_int_t +nxt_http_response_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + field->skip = 1; + + return NXT_OK; +} + + +nxt_int_t +nxt_http_response_field(void *ctx, nxt_http_field_t *field, uintptr_t offset) +{ + nxt_http_request_t *r; + + r = ctx; + + nxt_value_at(nxt_http_field_t *, r, offset) = field; + + return NXT_OK; +} diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 14427b03..10b5a4e9 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -505,19 +505,11 @@ nxt_php_send_headers(sapi_headers_struct *sapi_headers TSRMLS_DC) zend_llist_position zpos; static const u_char default_repsonse[] - = "HTTP/1.1 200 OK\r\n" - "Server: unit/" NXT_VERSION "\r\n" - "Content-Type: text/html; charset=UTF-8\r\n" - "Connection: close\r\n" + = "Status: 200\r\n" "\r\n"; - static const u_char default_headers[] - = "Server: unit/" NXT_VERSION "\r\n" - "Connection: close\r\n"; - - static const u_char http_11[] = "HTTP/1.1 "; + static const u_char status_200[] = "Status: 200"; static const u_char cr_lf[] = "\r\n"; - static const u_char _200_ok[] = "200 OK"; ctx = SG(server_context); @@ -539,23 +531,26 @@ nxt_php_send_headers(sapi_headers_struct *sapi_headers TSRMLS_DC) status = (u_char *) SG(sapi_headers).http_status_line; len = nxt_strlen(status); - RC(nxt_php_write(ctx, status, len, 0, 0)); + if (len < 12) { + goto fail; + } + + RC(nxt_php_write(ctx, status_200, sizeof(status_200) - 4, 0, 0)); + RC(nxt_php_write(ctx, status + 9, 3, 0, 0)); } else if (SG(sapi_headers).http_response_code) { status = nxt_sprintf(buf, buf + sizeof(buf), "%03d", SG(sapi_headers).http_response_code); len = status - buf; - RC(nxt_php_write(ctx, http_11, sizeof(http_11) - 1, 0, 0)); + RC(nxt_php_write(ctx, status_200, sizeof(status_200) - 4, 0, 0)); RC(nxt_php_write(ctx, buf, len, 0, 0)); } else { - RC(nxt_php_write(ctx, http_11, sizeof(http_11) - 1, 0, 0)); - RC(nxt_php_write(ctx, _200_ok, sizeof(_200_ok) - 1, 0, 0)); + RC(nxt_php_write(ctx, status_200, sizeof(status_200) - 1, 0, 0)); } RC(nxt_php_write(ctx, cr_lf, sizeof(cr_lf) - 1, 0, 0)); - RC(nxt_php_write(ctx, default_headers, sizeof(default_headers) - 1, 0, 0)); h = zend_llist_get_first_ex(&sapi_headers->headers, &zpos); diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index 0f56ea37..ae0a60d9 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -775,11 +775,7 @@ nxt_py_start_resp(PyObject *self, PyObject *args) nxt_uint_t i, n; nxt_python_run_ctx_t *ctx; - static const u_char resp[] = "HTTP/1.1 "; - - static const u_char default_headers[] - = "Server: unit/" NXT_VERSION "\r\n" - "Connection: close\r\n"; + static const u_char status[] = "Status: "; static const u_char cr_lf[] = "\r\n"; static const u_char sc_sp[] = ": "; @@ -794,7 +790,7 @@ nxt_py_start_resp(PyObject *self, PyObject *args) ctx = nxt_python_run_ctx; - nxt_python_write(ctx, resp, sizeof(resp) - 1, 0, 0); + nxt_python_write(ctx, status, sizeof(status) - 1, 0, 0); rc = nxt_python_write_py_str(ctx, string, 0, 0); if (nxt_slow_path(rc != NXT_OK)) { @@ -804,8 +800,6 @@ nxt_py_start_resp(PyObject *self, PyObject *args) nxt_python_write(ctx, cr_lf, sizeof(cr_lf) - 1, 0, 0); - nxt_python_write(ctx, default_headers, sizeof(default_headers) - 1, 0, 0); - headers = PyTuple_GET_ITEM(args, 1); if (!PyList_Check(headers)) { diff --git a/src/nxt_router.c b/src/nxt_router.c index 66ad6478..54e70a1a 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -7,6 +7,7 @@ #include <nxt_router.h> #include <nxt_conf.h> +#include <nxt_http.h> typedef struct { @@ -35,15 +36,14 @@ typedef struct nxt_req_app_link_s nxt_req_app_link_t; typedef struct { - uint32_t stream; - nxt_conn_t *conn; - nxt_app_t *app; - nxt_port_t *app_port; - nxt_app_parse_ctx_t *ap; - nxt_msg_info_t msg_info; - nxt_req_app_link_t *ra; + uint32_t stream; + nxt_app_t *app; + nxt_port_t *app_port; + nxt_app_parse_ctx_t *ap; + nxt_msg_info_t msg_info; + nxt_req_app_link_t *ra; - nxt_queue_link_t link; /* for nxt_conn_t.requests */ + nxt_queue_link_t link; /* for nxt_conn_t.requests */ } nxt_req_conn_link_t; @@ -199,14 +199,6 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra); -static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, - void *data); -static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c); -static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, - void *data); -static void nxt_router_process_http_request(nxt_task_t *task, - nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, @@ -215,16 +207,11 @@ static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); -static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); -static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* str); +static const nxt_http_request_state_t nxt_http_request_send_state; +static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); static nxt_router_t *nxt_router; @@ -245,7 +232,7 @@ nxt_router_start(nxt_task_t *task, void *data) rt = task->thread->runtime; - ret = nxt_app_http_init(task, rt); + ret = nxt_http_init(task, rt); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -502,9 +489,8 @@ nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) static void nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) { - nxt_mp_t *mp; - nxt_conn_t *c; - nxt_req_conn_link_t *rc; + nxt_mp_t *mp; + nxt_req_conn_link_t *rc; nxt_assert(task->thread->engine == ra->work.data); nxt_assert(ra->use_count == 0); @@ -514,18 +500,16 @@ nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) rc = ra->rc; if (rc != NULL) { - c = rc->conn; - if (nxt_slow_path(ra->err_code != 0)) { - nxt_router_gen_error(task, c, ra->err_code, ra->err_str); + nxt_http_request_error(task, rc->ap->request, ra->err_code); } else { rc->app_port = ra->app_port; rc->msg_info = ra->msg_info; if (rc->app->timeout != 0) { - c->read_timer.handler = nxt_router_app_timeout; - nxt_timer_add(task->thread->engine, &c->read_timer, + rc->ap->timer.handler = nxt_router_app_timeout; + nxt_timer_add(task->thread->engine, &rc->ap->timer, rc->app->timeout); } @@ -693,10 +677,6 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) rc->ap = NULL; } - - nxt_queue_remove(&rc->link); - - rc->conn = NULL; } @@ -1079,6 +1059,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = { }, { + nxt_string("idle_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_socket_conf_t, idle_timeout), + }, + + { nxt_string("header_read_timeout"), NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, header_read_timeout), @@ -1089,6 +1075,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = { NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, body_read_timeout), }, + + { + nxt_string("send_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_socket_conf_t, send_timeout), + }, }; @@ -1296,8 +1288,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->large_header_buffers = 4; skcf->body_buffer_size = 16 * 1024; skcf->max_body_size = 2 * 1024 * 1024; + skcf->idle_timeout = 65000; skcf->header_read_timeout = 5000; skcf->body_read_timeout = 5000; + skcf->send_timeout = 5000; if (http != NULL) { ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, @@ -1308,7 +1302,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } - skcf->listen->handler = nxt_router_conn_init; + skcf->listen->handler = nxt_http_conn_init; skcf->router_conf = tmcf->conf; skcf->router_conf->count++; skcf->application = nxt_router_listener_application(tmcf, @@ -2377,92 +2371,20 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) } -static const nxt_conn_state_t nxt_router_conn_read_header_state - nxt_aligned(64) = -{ - .ready_handler = nxt_router_conn_http_header_parse, - .close_handler = nxt_router_conn_close, - .error_handler = nxt_router_conn_error, - - .timer_handler = nxt_router_conn_timeout, - .timer_value = nxt_router_conn_timeout_value, - .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), -}; - - -static const nxt_conn_state_t nxt_router_conn_read_body_state - nxt_aligned(64) = -{ - .ready_handler = nxt_router_conn_http_body_read, - .close_handler = nxt_router_conn_close, - .error_handler = nxt_router_conn_error, - - .timer_handler = nxt_router_conn_timeout, - .timer_value = nxt_router_conn_timeout_value, - .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_conn_t *c; - nxt_socket_conf_t *skcf; - nxt_event_engine_t *engine; - nxt_socket_conf_joint_t *joint; - - c = obj; - joint = data; - - nxt_debug(task, "router conn init"); - - c->joint = joint; - joint->count++; - - skcf = joint->socket_conf; - c->local = skcf->sockaddr; - - size = skcf->header_buffer_size; - c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); - - c->socket.data = NULL; - - engine = task->thread->engine; - c->read_work_queue = &engine->fast_work_queue; - c->write_work_queue = &engine->fast_work_queue; - - c->read_state = &nxt_router_conn_read_header_state; - - nxt_conn_read(engine, c); -} - - -static const nxt_conn_state_t nxt_router_conn_write_state - nxt_aligned(64) = -{ - .ready_handler = nxt_router_conn_ready, - .close_handler = nxt_router_conn_close, - .error_handler = nxt_router_conn_error, -}; - - static void nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { size_t dump_size; + nxt_int_t ret; nxt_buf_t *b, *last; - nxt_conn_t *c; - nxt_event_engine_t *engine; + nxt_http_request_t *r; nxt_req_conn_link_t *rc; + nxt_app_parse_ctx_t *ar; b = msg->buf; rc = data; - c = rc->conn; - dump_size = nxt_buf_used_size(b); if (dump_size > 300) { @@ -2477,16 +2399,16 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, b = NULL; } - engine = task->thread->engine; - - nxt_timer_disable(engine, &c->read_timer); + ar = rc->ap; if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); - last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + last = nxt_http_request_last_buffer(task, ar->request); if (nxt_slow_path(last == NULL)) { - /* TODO pogorevaTb */ + nxt_app_http_req_done(task, ar); + nxt_router_rc_unlink(task, rc); + return; } nxt_buf_chain_add(&b, last); @@ -2495,8 +2417,8 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } else { if (rc->app->timeout != 0) { - c->read_timer.handler = nxt_router_app_timeout; - nxt_timer_add(engine, &c->read_timer, rc->app->timeout); + ar->timer.handler = nxt_router_app_timeout; + nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout); } } @@ -2509,16 +2431,67 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, msg->buf = NULL; } - if (c->write == NULL) { - c->write = b; - c->write_state = &nxt_router_conn_write_state; + r = ar->request; - nxt_conn_write(task->thread->engine, c); + if (r->header_sent) { + nxt_buf_chain_add(&r->out, b); + nxt_http_request_send_body(task, r, NULL); } else { - nxt_debug(task, "router data attach out bufs to existing chain"); + ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem); + if (nxt_slow_path(ret != NXT_DONE)) { + goto fail; + } + + r->resp.fields = ar->resp_parser.fields; - nxt_buf_chain_add(&c->write, b); + ret = nxt_http_fields_process(r->resp.fields, + &nxt_response_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + if (nxt_buf_mem_used_size(&b->mem) != 0) { + nxt_buf_chain_add(&r->out, b); + } + + r->state = &nxt_http_request_send_state; + + nxt_http_request_header_send(task, r); + } + + return; + +fail: + + nxt_app_http_req_done(task, ar); + nxt_router_rc_unlink(task, rc); + + nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); +} + + +static const nxt_http_request_state_t nxt_http_request_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_request_send_body, + .error_handler = nxt_http_request_close_handler, +}; + + +static void +nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_http_request_t *r; + + r = obj; + + out = r->out; + + if (out != NULL) { + r->out = NULL; + nxt_http_request_send(task, r, out); } } @@ -2562,98 +2535,12 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } } - nxt_router_gen_error(task, rc->conn, 500, - "Application terminated unexpectedly"); + nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE); nxt_router_rc_unlink(task, rc); } -nxt_inline const char * -nxt_router_text_by_code(int code) -{ - switch (code) { - case 400: return "Bad request"; - case 404: return "Not found"; - case 403: return "Forbidden"; - case 408: return "Request Timeout"; - case 411: return "Length Required"; - case 413: return "Request Entity Too Large"; - case 500: - default: return "Internal server error"; - } -} - - -static nxt_buf_t * -nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, - const char* str) -{ - nxt_buf_t *b, *last; - - b = nxt_buf_mem_alloc(mp, 16384, 0); - if (nxt_slow_path(b == NULL)) { - return NULL; - } - - b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, - "HTTP/1.0 %d %s\r\n" - "Content-Type: text/plain\r\n" - "Connection: close\r\n\r\n", - code, nxt_router_text_by_code(code)); - - b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); - - last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); - - if (nxt_slow_path(last == NULL)) { - nxt_mp_free(mp, b); - return NULL; - } - - nxt_buf_chain_add(&b, last); - - return b; -} - - - -static void -nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* str) -{ - nxt_mp_t *mp; - nxt_buf_t *b; - - /* TODO: fix when called in the middle of response */ - - nxt_log_alert(task->log, "error %d: %s", code, str); - - if (c->socket.fd == -1) { - return; - } - - mp = c->mem_pool; - - b = nxt_router_get_error_buf(task, mp, code, str); - if (nxt_slow_path(b == NULL)) { - return; - } - - if (c->write == NULL) { - c->write = b; - c->write_state = &nxt_router_conn_write_state; - - nxt_conn_write(task->thread->engine, c); - - } else { - nxt_debug(task, "router data attach out bufs to existing chain"); - - nxt_buf_chain_add(&c->write, b); - } -} - - static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) @@ -3228,283 +3115,22 @@ nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) } -static void -nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_conn_t *c; - nxt_sockaddr_t *local; - nxt_app_parse_ctx_t *ap; - nxt_app_request_body_t *b; - nxt_socket_conf_joint_t *joint; - nxt_app_request_header_t *h; - - c = obj; - ap = data; - buf = c->read; - joint = c->joint; - - nxt_debug(task, "router conn http header parse"); - - if (ap == NULL) { - ap = nxt_app_http_req_init(task); - if (nxt_slow_path(ap == NULL)) { - nxt_router_gen_error(task, c, 500, - "Failed to allocate parse context"); - return; - } - - c->socket.data = ap; - - ap->r.remote.start = nxt_sockaddr_address(c->remote); - ap->r.remote.length = c->remote->address_length; - - /* - * TODO: need an application flag to get local address - * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go. - */ - local = nxt_router_local_addr(task, c); - - if (nxt_fast_path(local != NULL)) { - ap->r.local.start = nxt_sockaddr_address(local); - ap->r.local.length = local->address_length; - } - - ap->r.header.buf = buf; - } - - h = &ap->r.header; - b = &ap->r.body; - - ret = nxt_app_http_req_header_parse(task, ap, buf); - - nxt_debug(task, "http parse request header: %d", ret); - - switch (nxt_expect(NXT_DONE, ret)) { - - case NXT_DONE: - nxt_debug(task, "router request header parsing complete, " - "content length: %O, preread: %uz", - h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); - - if (b->done) { - nxt_router_process_http_request(task, c, ap); - - return; - } - - if (joint->socket_conf->max_body_size > 0 - && (size_t) h->parsed_content_length - > joint->socket_conf->max_body_size) - { - nxt_router_gen_error(task, c, 413, "Content-Length too big"); - return; - } - - if (nxt_buf_mem_free_size(&buf->mem) == 0) { - size = nxt_min(joint->socket_conf->body_buffer_size, - (size_t) h->parsed_content_length); - - buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(buf->next == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "buffer for request body"); - return; - } - - c->read = buf->next; - - b->preread_size += nxt_buf_mem_used_size(&buf->mem); - } - - if (b->buf == NULL) { - b->buf = c->read; - } - - c->read_state = &nxt_router_conn_read_body_state; - break; - - case NXT_ERROR: - nxt_router_gen_error(task, c, 400, "Request header parse error"); - return; - - default: /* NXT_AGAIN */ - - if (c->read->mem.free == c->read->mem.end) { - size = joint->socket_conf->large_header_buffer_size; - - if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) - || ap->r.header.bufs - >= joint->socket_conf->large_header_buffers) - { - nxt_router_gen_error(task, c, 413, - "Too long request headers"); - return; - } - - buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(buf->next == NULL)) { - nxt_router_gen_error(task, c, 500, - "Failed to allocate large header " - "buffer"); - return; - } - - ap->r.header.bufs++; - - size = c->read->mem.free - c->read->mem.pos; - - c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); - } - - } - - nxt_conn_read(task->thread->engine, c); -} - - -static nxt_sockaddr_t * -nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c) -{ - int ret; - size_t size, length; - socklen_t socklen; - nxt_sockaddr_t *sa; - - if (c->local != NULL) { - return c->local; - } - - /* AF_UNIX should not get in here. */ - - switch (c->remote->u.sockaddr.sa_family) { -#if (NXT_INET6) - case AF_INET6: - socklen = sizeof(struct sockaddr_in6); - length = NXT_INET6_ADDR_STR_LEN; - size = offsetof(nxt_sockaddr_t, u) + socklen + length; - break; -#endif - case AF_INET: - default: - socklen = sizeof(struct sockaddr_in); - length = NXT_INET_ADDR_STR_LEN; - size = offsetof(nxt_sockaddr_t, u) + socklen + length; - break; - } - - sa = nxt_mp_get(c->mem_pool, size); - if (nxt_slow_path(sa == NULL)) { - return NULL; - } - - sa->socklen = socklen; - sa->length = length; - - ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen); - if (nxt_slow_path(ret != 0)) { - nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd); - return NULL; - } - - c->local = sa; - - nxt_sockaddr_text(sa); - - /* - * TODO: here we can adjust the end of non-freeable block - * in c->mem_pool to the end of actual sockaddr length. - */ - - return sa; -} - - -static void -nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_conn_t *c; - nxt_app_parse_ctx_t *ap; - nxt_app_request_body_t *b; - nxt_socket_conf_joint_t *joint; - nxt_app_request_header_t *h; - - c = obj; - ap = data; - buf = c->read; - - nxt_debug(task, "router conn http body read"); - - nxt_assert(ap != NULL); - - b = &ap->r.body; - h = &ap->r.header; - - ret = nxt_app_http_req_body_read(task, ap, buf); - - nxt_debug(task, "http read request body: %d", ret); - - switch (nxt_expect(NXT_DONE, ret)) { - - case NXT_DONE: - nxt_router_process_http_request(task, c, ap); - return; - - case NXT_ERROR: - nxt_router_gen_error(task, c, 500, "Read body error"); - return; - - default: /* NXT_AGAIN */ - - if (nxt_buf_mem_free_size(&buf->mem) == 0) { - joint = c->joint; - - b->preread_size += nxt_buf_mem_used_size(&buf->mem); - - size = nxt_min(joint->socket_conf->body_buffer_size, - (size_t) h->parsed_content_length - b->preread_size); - - buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(buf->next == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "buffer for request body"); - return; - } - - c->read = buf->next; - } - - nxt_debug(task, "router request body read again, rest: %uz", - h->parsed_content_length - b->preread_size); - } - - nxt_conn_read(task->thread->engine, c); -} - - -static void -nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, - nxt_app_parse_ctx_t *ap) +void +nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar) { - nxt_int_t res; - nxt_app_t *app; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_req_app_link_t ra_local, *ra; - nxt_req_conn_link_t *rc; - nxt_socket_conf_joint_t *joint; + nxt_int_t res; + nxt_app_t *app; + nxt_port_t *port; + nxt_event_engine_t *engine; + nxt_http_request_t *r; + nxt_req_app_link_t ra_local, *ra; + nxt_req_conn_link_t *rc; - joint = c->joint; - app = joint->socket_conf->application; + r = ar->request; + app = r->socket_conf->application; if (app == NULL) { - nxt_router_gen_error(task, c, 500, - "Application is NULL in socket_conf"); + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); return; } @@ -3516,27 +3142,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, sizeof(nxt_req_conn_link_t)); if (nxt_slow_path(rc == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "req<->conn link"); - + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); return; } rc->stream = nxt_port_rpc_ex_stream(rc); - rc->conn = c; rc->app = app; nxt_router_app_use(task, app, 1); - nxt_timer_disable(engine, &c->read_timer); - - nxt_queue_insert_tail(&c->requests, &rc->link); - - nxt_debug(task, "stream #%uD linked to conn %p at engine %p", - rc->stream, c, engine); - - rc->ap = ap; - c->socket.data = NULL; + rc->ap = ar; ra = &ra_local; nxt_router_ra_init(task, ra, rc); @@ -3912,7 +3527,7 @@ fail: } -static const nxt_conn_state_t nxt_router_conn_close_state +const nxt_conn_state_t nxt_router_conn_close_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_free, @@ -3920,75 +3535,6 @@ static const nxt_conn_state_t nxt_router_conn_close_state static void -nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_bool_t last; - nxt_conn_t *c; - nxt_work_queue_t *wq; - - nxt_debug(task, "router conn ready %p", obj); - - c = obj; - b = c->write; - - wq = &task->thread->engine->fast_work_queue; - - last = 0; - - while (b != NULL) { - if (!nxt_buf_is_sync(b)) { - if (nxt_buf_used_size(b) > 0) { - break; - } - } - - if (nxt_buf_is_last(b)) { - last = 1; - } - - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - - b = b->next; - } - - c->write = b; - - if (b != NULL) { - nxt_debug(task, "router conn %p has more data to write", obj); - - nxt_conn_write(task->thread->engine, c); - - } else { - nxt_debug(task, "router conn %p no more data to write, last = %d", obj, - last); - - if (last != 0) { - nxt_debug(task, "enqueue router conn close %p (ready handler)", c); - - nxt_work_queue_add(wq, nxt_router_conn_close, task, c, - c->socket.data); - } - } -} - - -static void -nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - - c = obj; - - nxt_debug(task, "router conn close"); - - c->write_state = &nxt_router_conn_close_state; - - nxt_conn_close(task->thread->engine, c); -} - - -static void nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) { nxt_socket_conf_joint_t *joint; @@ -4004,31 +3550,12 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; nxt_event_engine_t *engine; - nxt_req_conn_link_t *rc; - nxt_app_parse_ctx_t *ap; nxt_socket_conf_joint_t *joint; c = obj; - ap = data; nxt_debug(task, "router conn close done"); - if (ap != NULL) { - nxt_app_http_req_done(task, ap); - - c->socket.data = NULL; - } - - nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { - - nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); - - nxt_router_rc_unlink(task, rc); - - nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); - - } nxt_queue_loop; - nxt_queue_remove(&c->link); engine = task->thread->engine; @@ -4045,65 +3572,18 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) static void -nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - - c = obj; - - nxt_debug(task, "router conn error"); - - if (c->socket.fd != -1) { - c->write_state = &nxt_router_conn_close_state; - - nxt_conn_close(task->thread->engine, c); - } -} - - -static void -nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - nxt_timer_t *timer; - - timer = obj; - - nxt_debug(task, "router conn timeout"); - - c = nxt_read_timer_conn(timer); - - if (c->read_state == &nxt_router_conn_read_header_state) { - nxt_router_gen_error(task, c, 408, "Read header timeout"); - - } else { - nxt_router_gen_error(task, c, 408, "Read body timeout"); - } -} - - -static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; - nxt_timer_t *timer; + nxt_timer_t *timer; + nxt_app_parse_ctx_t *ar; timer = obj; nxt_debug(task, "router app timeout"); - c = nxt_read_timer_conn(timer); - - nxt_router_gen_error(task, c, 408, "Application timeout"); -} - + ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); -static nxt_msec_t -nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) -{ - nxt_socket_conf_joint_t *joint; - - joint = c->joint; - - return nxt_value_at(nxt_msec_t, joint->socket_conf, data); + if (!ar->request->header_sent) { + nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE); + } } diff --git a/src/nxt_router.h b/src/nxt_router.h index 76a04d6e..c405c8f3 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -14,6 +14,9 @@ #include <nxt_application.h> +typedef struct nxt_http_request_s nxt_http_request_t; + + typedef struct { nxt_thread_spinlock_t lock; nxt_queue_t engines; @@ -127,8 +130,10 @@ typedef struct { size_t large_header_buffers; size_t body_buffer_size; size_t max_body_size; + nxt_msec_t idle_timeout; nxt_msec_t header_read_timeout; nxt_msec_t body_read_timeout; + nxt_msec_t send_timeout; } nxt_socket_conf_t; @@ -146,7 +151,9 @@ void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar); void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port); void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); + #endif /* _NXT_ROUTER_H_INCLUDED_ */ diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 1684f67c..b36c5edb 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -434,3 +434,23 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, return b; } + + +nxt_buf_t * +nxt_sendbuf_completion0(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) +{ + while (b != NULL) { + + nxt_prefetch(b->next); + + if (nxt_buf_used_size(b) != 0) { + break; + } + + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); + + b = b->next; + } + + return b; +} diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index 3bcbaaf4..0406faf6 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -124,6 +124,8 @@ ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent); nxt_buf_t *nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); +nxt_buf_t *nxt_sendbuf_completion0(nxt_task_t *task, nxt_work_queue_t *wq, + nxt_buf_t *b); #endif /* _NXT_SENDBUF_H_INCLUDED_ */ |