diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-12-28 16:01:06 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-12-28 16:01:06 +0300 |
commit | 9a6d3c5775d945509c7c2cbec48be59757da42c3 (patch) | |
tree | 7129c13d6027a45e1a324deab373125bd7a14794 /src/nxt_router.c | |
parent | 497faf1b9abb188cab40c389a9e6221add5dd496 (diff) | |
download | unit-9a6d3c5775d945509c7c2cbec48be59757da42c3.tar.gz unit-9a6d3c5775d945509c7c2cbec48be59757da42c3.tar.bz2 |
HTTP keep-alive connections support.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 760 |
1 files changed, 120 insertions, 640 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 66ad6478..54e70a1a 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -7,6 +7,7 @@ #include <nxt_router.h> #include <nxt_conf.h> +#include <nxt_http.h> typedef struct { @@ -35,15 +36,14 @@ typedef struct nxt_req_app_link_s nxt_req_app_link_t; typedef struct { - uint32_t stream; - nxt_conn_t *conn; - nxt_app_t *app; - nxt_port_t *app_port; - nxt_app_parse_ctx_t *ap; - nxt_msg_info_t msg_info; - nxt_req_app_link_t *ra; + uint32_t stream; + nxt_app_t *app; + nxt_port_t *app_port; + nxt_app_parse_ctx_t *ap; + nxt_msg_info_t msg_info; + nxt_req_app_link_t *ra; - nxt_queue_link_t link; /* for nxt_conn_t.requests */ + nxt_queue_link_t link; /* for nxt_conn_t.requests */ } nxt_req_conn_link_t; @@ -199,14 +199,6 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra); -static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, - void *data); -static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c); -static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, - void *data); -static void nxt_router_process_http_request(nxt_task_t *task, - nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, @@ -215,16 +207,11 @@ static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); -static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); -static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); -static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* str); +static const nxt_http_request_state_t nxt_http_request_send_state; +static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); static nxt_router_t *nxt_router; @@ -245,7 +232,7 @@ nxt_router_start(nxt_task_t *task, void *data) rt = task->thread->runtime; - ret = nxt_app_http_init(task, rt); + ret = nxt_http_init(task, rt); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -502,9 +489,8 @@ nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) static void nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) { - nxt_mp_t *mp; - nxt_conn_t *c; - nxt_req_conn_link_t *rc; + nxt_mp_t *mp; + nxt_req_conn_link_t *rc; nxt_assert(task->thread->engine == ra->work.data); nxt_assert(ra->use_count == 0); @@ -514,18 +500,16 @@ nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) rc = ra->rc; if (rc != NULL) { - c = rc->conn; - if (nxt_slow_path(ra->err_code != 0)) { - nxt_router_gen_error(task, c, ra->err_code, ra->err_str); + nxt_http_request_error(task, rc->ap->request, ra->err_code); } else { rc->app_port = ra->app_port; rc->msg_info = ra->msg_info; if (rc->app->timeout != 0) { - c->read_timer.handler = nxt_router_app_timeout; - nxt_timer_add(task->thread->engine, &c->read_timer, + rc->ap->timer.handler = nxt_router_app_timeout; + nxt_timer_add(task->thread->engine, &rc->ap->timer, rc->app->timeout); } @@ -693,10 +677,6 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) rc->ap = NULL; } - - nxt_queue_remove(&rc->link); - - rc->conn = NULL; } @@ -1079,6 +1059,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = { }, { + nxt_string("idle_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_socket_conf_t, idle_timeout), + }, + + { nxt_string("header_read_timeout"), NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, header_read_timeout), @@ -1089,6 +1075,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = { NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, body_read_timeout), }, + + { + nxt_string("send_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_socket_conf_t, send_timeout), + }, }; @@ -1296,8 +1288,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->large_header_buffers = 4; skcf->body_buffer_size = 16 * 1024; skcf->max_body_size = 2 * 1024 * 1024; + skcf->idle_timeout = 65000; skcf->header_read_timeout = 5000; skcf->body_read_timeout = 5000; + skcf->send_timeout = 5000; if (http != NULL) { ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, @@ -1308,7 +1302,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } - skcf->listen->handler = nxt_router_conn_init; + skcf->listen->handler = nxt_http_conn_init; skcf->router_conf = tmcf->conf; skcf->router_conf->count++; skcf->application = nxt_router_listener_application(tmcf, @@ -2377,92 +2371,20 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) } -static const nxt_conn_state_t nxt_router_conn_read_header_state - nxt_aligned(64) = -{ - .ready_handler = nxt_router_conn_http_header_parse, - .close_handler = nxt_router_conn_close, - .error_handler = nxt_router_conn_error, - - .timer_handler = nxt_router_conn_timeout, - .timer_value = nxt_router_conn_timeout_value, - .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), -}; - - -static const nxt_conn_state_t nxt_router_conn_read_body_state - nxt_aligned(64) = -{ - .ready_handler = nxt_router_conn_http_body_read, - .close_handler = nxt_router_conn_close, - .error_handler = nxt_router_conn_error, - - .timer_handler = nxt_router_conn_timeout, - .timer_value = nxt_router_conn_timeout_value, - .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_conn_t *c; - nxt_socket_conf_t *skcf; - nxt_event_engine_t *engine; - nxt_socket_conf_joint_t *joint; - - c = obj; - joint = data; - - nxt_debug(task, "router conn init"); - - c->joint = joint; - joint->count++; - - skcf = joint->socket_conf; - c->local = skcf->sockaddr; - - size = skcf->header_buffer_size; - c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); - - c->socket.data = NULL; - - engine = task->thread->engine; - c->read_work_queue = &engine->fast_work_queue; - c->write_work_queue = &engine->fast_work_queue; - - c->read_state = &nxt_router_conn_read_header_state; - - nxt_conn_read(engine, c); -} - - -static const nxt_conn_state_t nxt_router_conn_write_state - nxt_aligned(64) = -{ - .ready_handler = nxt_router_conn_ready, - .close_handler = nxt_router_conn_close, - .error_handler = nxt_router_conn_error, -}; - - static void nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { size_t dump_size; + nxt_int_t ret; nxt_buf_t *b, *last; - nxt_conn_t *c; - nxt_event_engine_t *engine; + nxt_http_request_t *r; nxt_req_conn_link_t *rc; + nxt_app_parse_ctx_t *ar; b = msg->buf; rc = data; - c = rc->conn; - dump_size = nxt_buf_used_size(b); if (dump_size > 300) { @@ -2477,16 +2399,16 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, b = NULL; } - engine = task->thread->engine; - - nxt_timer_disable(engine, &c->read_timer); + ar = rc->ap; if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); - last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + last = nxt_http_request_last_buffer(task, ar->request); if (nxt_slow_path(last == NULL)) { - /* TODO pogorevaTb */ + nxt_app_http_req_done(task, ar); + nxt_router_rc_unlink(task, rc); + return; } nxt_buf_chain_add(&b, last); @@ -2495,8 +2417,8 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } else { if (rc->app->timeout != 0) { - c->read_timer.handler = nxt_router_app_timeout; - nxt_timer_add(engine, &c->read_timer, rc->app->timeout); + ar->timer.handler = nxt_router_app_timeout; + nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout); } } @@ -2509,16 +2431,67 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, msg->buf = NULL; } - if (c->write == NULL) { - c->write = b; - c->write_state = &nxt_router_conn_write_state; + r = ar->request; - nxt_conn_write(task->thread->engine, c); + if (r->header_sent) { + nxt_buf_chain_add(&r->out, b); + nxt_http_request_send_body(task, r, NULL); } else { - nxt_debug(task, "router data attach out bufs to existing chain"); + ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem); + if (nxt_slow_path(ret != NXT_DONE)) { + goto fail; + } + + r->resp.fields = ar->resp_parser.fields; - nxt_buf_chain_add(&c->write, b); + ret = nxt_http_fields_process(r->resp.fields, + &nxt_response_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + if (nxt_buf_mem_used_size(&b->mem) != 0) { + nxt_buf_chain_add(&r->out, b); + } + + r->state = &nxt_http_request_send_state; + + nxt_http_request_header_send(task, r); + } + + return; + +fail: + + nxt_app_http_req_done(task, ar); + nxt_router_rc_unlink(task, rc); + + nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); +} + + +static const nxt_http_request_state_t nxt_http_request_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_request_send_body, + .error_handler = nxt_http_request_close_handler, +}; + + +static void +nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_http_request_t *r; + + r = obj; + + out = r->out; + + if (out != NULL) { + r->out = NULL; + nxt_http_request_send(task, r, out); } } @@ -2562,98 +2535,12 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } } - nxt_router_gen_error(task, rc->conn, 500, - "Application terminated unexpectedly"); + nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE); nxt_router_rc_unlink(task, rc); } -nxt_inline const char * -nxt_router_text_by_code(int code) -{ - switch (code) { - case 400: return "Bad request"; - case 404: return "Not found"; - case 403: return "Forbidden"; - case 408: return "Request Timeout"; - case 411: return "Length Required"; - case 413: return "Request Entity Too Large"; - case 500: - default: return "Internal server error"; - } -} - - -static nxt_buf_t * -nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, - const char* str) -{ - nxt_buf_t *b, *last; - - b = nxt_buf_mem_alloc(mp, 16384, 0); - if (nxt_slow_path(b == NULL)) { - return NULL; - } - - b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, - "HTTP/1.0 %d %s\r\n" - "Content-Type: text/plain\r\n" - "Connection: close\r\n\r\n", - code, nxt_router_text_by_code(code)); - - b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); - - last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); - - if (nxt_slow_path(last == NULL)) { - nxt_mp_free(mp, b); - return NULL; - } - - nxt_buf_chain_add(&b, last); - - return b; -} - - - -static void -nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* str) -{ - nxt_mp_t *mp; - nxt_buf_t *b; - - /* TODO: fix when called in the middle of response */ - - nxt_log_alert(task->log, "error %d: %s", code, str); - - if (c->socket.fd == -1) { - return; - } - - mp = c->mem_pool; - - b = nxt_router_get_error_buf(task, mp, code, str); - if (nxt_slow_path(b == NULL)) { - return; - } - - if (c->write == NULL) { - c->write = b; - c->write_state = &nxt_router_conn_write_state; - - nxt_conn_write(task->thread->engine, c); - - } else { - nxt_debug(task, "router data attach out bufs to existing chain"); - - nxt_buf_chain_add(&c->write, b); - } -} - - static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) @@ -3228,283 +3115,22 @@ nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) } -static void -nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_conn_t *c; - nxt_sockaddr_t *local; - nxt_app_parse_ctx_t *ap; - nxt_app_request_body_t *b; - nxt_socket_conf_joint_t *joint; - nxt_app_request_header_t *h; - - c = obj; - ap = data; - buf = c->read; - joint = c->joint; - - nxt_debug(task, "router conn http header parse"); - - if (ap == NULL) { - ap = nxt_app_http_req_init(task); - if (nxt_slow_path(ap == NULL)) { - nxt_router_gen_error(task, c, 500, - "Failed to allocate parse context"); - return; - } - - c->socket.data = ap; - - ap->r.remote.start = nxt_sockaddr_address(c->remote); - ap->r.remote.length = c->remote->address_length; - - /* - * TODO: need an application flag to get local address - * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go. - */ - local = nxt_router_local_addr(task, c); - - if (nxt_fast_path(local != NULL)) { - ap->r.local.start = nxt_sockaddr_address(local); - ap->r.local.length = local->address_length; - } - - ap->r.header.buf = buf; - } - - h = &ap->r.header; - b = &ap->r.body; - - ret = nxt_app_http_req_header_parse(task, ap, buf); - - nxt_debug(task, "http parse request header: %d", ret); - - switch (nxt_expect(NXT_DONE, ret)) { - - case NXT_DONE: - nxt_debug(task, "router request header parsing complete, " - "content length: %O, preread: %uz", - h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); - - if (b->done) { - nxt_router_process_http_request(task, c, ap); - - return; - } - - if (joint->socket_conf->max_body_size > 0 - && (size_t) h->parsed_content_length - > joint->socket_conf->max_body_size) - { - nxt_router_gen_error(task, c, 413, "Content-Length too big"); - return; - } - - if (nxt_buf_mem_free_size(&buf->mem) == 0) { - size = nxt_min(joint->socket_conf->body_buffer_size, - (size_t) h->parsed_content_length); - - buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(buf->next == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "buffer for request body"); - return; - } - - c->read = buf->next; - - b->preread_size += nxt_buf_mem_used_size(&buf->mem); - } - - if (b->buf == NULL) { - b->buf = c->read; - } - - c->read_state = &nxt_router_conn_read_body_state; - break; - - case NXT_ERROR: - nxt_router_gen_error(task, c, 400, "Request header parse error"); - return; - - default: /* NXT_AGAIN */ - - if (c->read->mem.free == c->read->mem.end) { - size = joint->socket_conf->large_header_buffer_size; - - if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) - || ap->r.header.bufs - >= joint->socket_conf->large_header_buffers) - { - nxt_router_gen_error(task, c, 413, - "Too long request headers"); - return; - } - - buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(buf->next == NULL)) { - nxt_router_gen_error(task, c, 500, - "Failed to allocate large header " - "buffer"); - return; - } - - ap->r.header.bufs++; - - size = c->read->mem.free - c->read->mem.pos; - - c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); - } - - } - - nxt_conn_read(task->thread->engine, c); -} - - -static nxt_sockaddr_t * -nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c) -{ - int ret; - size_t size, length; - socklen_t socklen; - nxt_sockaddr_t *sa; - - if (c->local != NULL) { - return c->local; - } - - /* AF_UNIX should not get in here. */ - - switch (c->remote->u.sockaddr.sa_family) { -#if (NXT_INET6) - case AF_INET6: - socklen = sizeof(struct sockaddr_in6); - length = NXT_INET6_ADDR_STR_LEN; - size = offsetof(nxt_sockaddr_t, u) + socklen + length; - break; -#endif - case AF_INET: - default: - socklen = sizeof(struct sockaddr_in); - length = NXT_INET_ADDR_STR_LEN; - size = offsetof(nxt_sockaddr_t, u) + socklen + length; - break; - } - - sa = nxt_mp_get(c->mem_pool, size); - if (nxt_slow_path(sa == NULL)) { - return NULL; - } - - sa->socklen = socklen; - sa->length = length; - - ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen); - if (nxt_slow_path(ret != 0)) { - nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd); - return NULL; - } - - c->local = sa; - - nxt_sockaddr_text(sa); - - /* - * TODO: here we can adjust the end of non-freeable block - * in c->mem_pool to the end of actual sockaddr length. - */ - - return sa; -} - - -static void -nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_conn_t *c; - nxt_app_parse_ctx_t *ap; - nxt_app_request_body_t *b; - nxt_socket_conf_joint_t *joint; - nxt_app_request_header_t *h; - - c = obj; - ap = data; - buf = c->read; - - nxt_debug(task, "router conn http body read"); - - nxt_assert(ap != NULL); - - b = &ap->r.body; - h = &ap->r.header; - - ret = nxt_app_http_req_body_read(task, ap, buf); - - nxt_debug(task, "http read request body: %d", ret); - - switch (nxt_expect(NXT_DONE, ret)) { - - case NXT_DONE: - nxt_router_process_http_request(task, c, ap); - return; - - case NXT_ERROR: - nxt_router_gen_error(task, c, 500, "Read body error"); - return; - - default: /* NXT_AGAIN */ - - if (nxt_buf_mem_free_size(&buf->mem) == 0) { - joint = c->joint; - - b->preread_size += nxt_buf_mem_used_size(&buf->mem); - - size = nxt_min(joint->socket_conf->body_buffer_size, - (size_t) h->parsed_content_length - b->preread_size); - - buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(buf->next == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "buffer for request body"); - return; - } - - c->read = buf->next; - } - - nxt_debug(task, "router request body read again, rest: %uz", - h->parsed_content_length - b->preread_size); - } - - nxt_conn_read(task->thread->engine, c); -} - - -static void -nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, - nxt_app_parse_ctx_t *ap) +void +nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar) { - nxt_int_t res; - nxt_app_t *app; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_req_app_link_t ra_local, *ra; - nxt_req_conn_link_t *rc; - nxt_socket_conf_joint_t *joint; + nxt_int_t res; + nxt_app_t *app; + nxt_port_t *port; + nxt_event_engine_t *engine; + nxt_http_request_t *r; + nxt_req_app_link_t ra_local, *ra; + nxt_req_conn_link_t *rc; - joint = c->joint; - app = joint->socket_conf->application; + r = ar->request; + app = r->socket_conf->application; if (app == NULL) { - nxt_router_gen_error(task, c, 500, - "Application is NULL in socket_conf"); + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); return; } @@ -3516,27 +3142,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, sizeof(nxt_req_conn_link_t)); if (nxt_slow_path(rc == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "req<->conn link"); - + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); return; } rc->stream = nxt_port_rpc_ex_stream(rc); - rc->conn = c; rc->app = app; nxt_router_app_use(task, app, 1); - nxt_timer_disable(engine, &c->read_timer); - - nxt_queue_insert_tail(&c->requests, &rc->link); - - nxt_debug(task, "stream #%uD linked to conn %p at engine %p", - rc->stream, c, engine); - - rc->ap = ap; - c->socket.data = NULL; + rc->ap = ar; ra = &ra_local; nxt_router_ra_init(task, ra, rc); @@ -3912,7 +3527,7 @@ fail: } -static const nxt_conn_state_t nxt_router_conn_close_state +const nxt_conn_state_t nxt_router_conn_close_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_free, @@ -3920,75 +3535,6 @@ static const nxt_conn_state_t nxt_router_conn_close_state static void -nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_bool_t last; - nxt_conn_t *c; - nxt_work_queue_t *wq; - - nxt_debug(task, "router conn ready %p", obj); - - c = obj; - b = c->write; - - wq = &task->thread->engine->fast_work_queue; - - last = 0; - - while (b != NULL) { - if (!nxt_buf_is_sync(b)) { - if (nxt_buf_used_size(b) > 0) { - break; - } - } - - if (nxt_buf_is_last(b)) { - last = 1; - } - - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - - b = b->next; - } - - c->write = b; - - if (b != NULL) { - nxt_debug(task, "router conn %p has more data to write", obj); - - nxt_conn_write(task->thread->engine, c); - - } else { - nxt_debug(task, "router conn %p no more data to write, last = %d", obj, - last); - - if (last != 0) { - nxt_debug(task, "enqueue router conn close %p (ready handler)", c); - - nxt_work_queue_add(wq, nxt_router_conn_close, task, c, - c->socket.data); - } - } -} - - -static void -nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - - c = obj; - - nxt_debug(task, "router conn close"); - - c->write_state = &nxt_router_conn_close_state; - - nxt_conn_close(task->thread->engine, c); -} - - -static void nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) { nxt_socket_conf_joint_t *joint; @@ -4004,31 +3550,12 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; nxt_event_engine_t *engine; - nxt_req_conn_link_t *rc; - nxt_app_parse_ctx_t *ap; nxt_socket_conf_joint_t *joint; c = obj; - ap = data; nxt_debug(task, "router conn close done"); - if (ap != NULL) { - nxt_app_http_req_done(task, ap); - - c->socket.data = NULL; - } - - nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { - - nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); - - nxt_router_rc_unlink(task, rc); - - nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); - - } nxt_queue_loop; - nxt_queue_remove(&c->link); engine = task->thread->engine; @@ -4045,65 +3572,18 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) static void -nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - - c = obj; - - nxt_debug(task, "router conn error"); - - if (c->socket.fd != -1) { - c->write_state = &nxt_router_conn_close_state; - - nxt_conn_close(task->thread->engine, c); - } -} - - -static void -nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - nxt_timer_t *timer; - - timer = obj; - - nxt_debug(task, "router conn timeout"); - - c = nxt_read_timer_conn(timer); - - if (c->read_state == &nxt_router_conn_read_header_state) { - nxt_router_gen_error(task, c, 408, "Read header timeout"); - - } else { - nxt_router_gen_error(task, c, 408, "Read body timeout"); - } -} - - -static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; - nxt_timer_t *timer; + nxt_timer_t *timer; + nxt_app_parse_ctx_t *ar; timer = obj; nxt_debug(task, "router app timeout"); - c = nxt_read_timer_conn(timer); - - nxt_router_gen_error(task, c, 408, "Application timeout"); -} - + ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); -static nxt_msec_t -nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) -{ - nxt_socket_conf_joint_t *joint; - - joint = c->joint; - - return nxt_value_at(nxt_msec_t, joint->socket_conf, data); + if (!ar->request->header_sent) { + nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE); + } } |