diff options
author | Igor Sysoev <igor@sysoev.ru> | 2019-11-14 16:39:54 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2019-11-14 16:39:54 +0300 |
commit | ddde9c23cf302309d85ccc24b35075ce94da89a0 (patch) | |
tree | 2bf729e8f0315cd4c775be572c0f01688d11bf03 /src | |
parent | 57e326b4119863f737d8677adc69dc53c7e4ed27 (diff) | |
download | unit-ddde9c23cf302309d85ccc24b35075ce94da89a0.tar.gz unit-ddde9c23cf302309d85ccc24b35075ce94da89a0.tar.bz2 |
Initial proxy support.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_conf_validation.c | 41 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 787 | ||||
-rw-r--r-- | src/nxt_h1proto.h | 2 | ||||
-rw-r--r-- | src/nxt_http.h | 41 | ||||
-rw-r--r-- | src/nxt_http_error.c | 4 | ||||
-rw-r--r-- | src/nxt_http_parse.c | 1 | ||||
-rw-r--r-- | src/nxt_http_parse.h | 3 | ||||
-rw-r--r-- | src/nxt_http_proxy.c | 380 | ||||
-rw-r--r-- | src/nxt_http_request.c | 4 | ||||
-rw-r--r-- | src/nxt_http_route.c | 20 | ||||
-rw-r--r-- | src/nxt_http_static.c | 2 | ||||
-rw-r--r-- | src/nxt_router.c | 9 | ||||
-rw-r--r-- | src/nxt_router.h | 7 | ||||
-rw-r--r-- | src/nxt_socket.c | 3 | ||||
-rw-r--r-- | src/nxt_unit_field.h | 3 |
15 files changed, 1263 insertions, 44 deletions
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 3cfecbad..105af675 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -62,6 +62,8 @@ static nxt_int_t nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes_member(nxt_conf_validation_t *vldt, @@ -316,6 +318,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_action_members[] = { NULL, NULL }, + { nxt_string("proxy"), + NXT_CONF_VLDT_STRING, + &nxt_conf_vldt_proxy, + NULL }, + NXT_CONF_VLDT_END }; @@ -885,10 +892,11 @@ nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { nxt_int_t ret; - nxt_conf_value_t *pass_value, *share_value; + nxt_conf_value_t *pass_value, *share_value, *proxy_value; static nxt_str_t pass_str = nxt_string("pass"); static nxt_str_t share_str = nxt_string("share"); + static nxt_str_t proxy_str = nxt_string("proxy"); ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_action_members); @@ -898,11 +906,12 @@ nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, pass_value = nxt_conf_get_object_member(value, &pass_str, NULL); share_value = nxt_conf_get_object_member(value, &share_str, NULL); + proxy_value = nxt_conf_get_object_member(value, &proxy_str, NULL); - if (pass_value == NULL && share_value == NULL) { + if (pass_value == NULL && share_value == NULL && proxy_value == NULL) { return nxt_conf_vldt_error(vldt, "The \"action\" object must have " - "either \"pass\" or \"share\" " - "option set."); + "either \"pass\" or \"share\" or " + "\"proxy\" option set."); } return NXT_OK; @@ -993,6 +1002,30 @@ error: static nxt_int_t +nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + nxt_str_t name; + nxt_sockaddr_t *sa; + + nxt_conf_get_string(value, &name); + + if (nxt_str_start(&name, "http://", 7)) { + name.length -= 7; + name.start += 7; + + sa = nxt_sockaddr_parse(vldt->pool, &name); + if (sa != NULL) { + return NXT_OK; + } + } + + return nxt_conf_vldt_error(vldt, "The \"proxy\" address is invalid \"%V\"", + &name); +} + + +static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index c2866ccf..064b8f38 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -45,7 +45,7 @@ static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_header_send(nxt_task_t *task, - nxt_http_request_t *r, nxt_work_handler_t body_handler); + nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data); static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, @@ -78,11 +78,32 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c); -static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer); +static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, + void *data); +static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, + nxt_buf_mem_t *bm); +static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data); +static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data); + #if (NXT_TLS) static const nxt_conn_state_t nxt_http_idle_state; static const nxt_conn_state_t nxt_h1p_shutdown_state; @@ -94,6 +115,13 @@ static const nxt_conn_state_t nxt_h1p_request_send_state; static const nxt_conn_state_t nxt_h1p_timeout_response_state; static const nxt_conn_state_t nxt_h1p_keepalive_state; static const nxt_conn_state_t nxt_h1p_close_state; +static const nxt_conn_state_t nxt_h1p_peer_connect_state; +static const nxt_conn_state_t nxt_h1p_peer_header_send_state; +static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state; +static const nxt_conn_state_t nxt_h1p_peer_header_read_state; +static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state; +static const nxt_conn_state_t nxt_h1p_peer_read_state; +static const nxt_conn_state_t nxt_h1p_peer_close_state; const nxt_http_proto_table_t nxt_http_proto[3] = { @@ -106,6 +134,13 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { .body_bytes_sent = nxt_h1p_request_body_bytes_sent, .discard = nxt_h1p_request_discard, .close = nxt_h1p_request_close, + + .peer_connect = nxt_h1p_peer_connect, + .peer_header_send = nxt_h1p_peer_header_send, + .peer_header_read = nxt_h1p_peer_header_read, + .peer_read = nxt_h1p_peer_read, + .peer_close = nxt_h1p_peer_close, + .ws_frame_start = nxt_h1p_websocket_frame_start, }, /* NXT_HTTP_PROTO_H2 */ @@ -113,9 +148,9 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { }; -static nxt_lvlhsh_t nxt_h1p_fields_hash; +static nxt_lvlhsh_t nxt_h1p_fields_hash; -static nxt_http_field_proc_t nxt_h1p_fields[] = { +static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Connection"), &nxt_h1p_connection, 0 }, { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 }, { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 }, @@ -136,11 +171,32 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = { }; +static nxt_lvlhsh_t nxt_h1p_peer_fields_hash; + +static nxt_http_field_proc_t nxt_h1p_peer_fields[] = { + { nxt_string("Connection"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Server"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Date"), &nxt_http_proxy_date, 0 }, + { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 }, +}; + + nxt_int_t nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt) { - return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool, - nxt_h1p_fields, nxt_nitems(nxt_h1p_fields)); + nxt_int_t ret; + + ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool, + nxt_h1p_fields, nxt_nitems(nxt_h1p_fields)); + + if (nxt_fast_path(ret == NXT_OK)) { + ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash, + rt->mem_pool, nxt_h1p_peer_fields, + nxt_nitems(nxt_h1p_peer_fields)); + } + + return ret; } @@ -303,7 +359,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) * Listening socket had been closed while * connection was in keep-alive state. */ - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); return; } @@ -388,7 +444,7 @@ nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data) h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t)); if (nxt_slow_path(h1p == NULL)) { - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); return; } @@ -452,7 +508,7 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) nxt_mp_release(r->mem_pool); } - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); } @@ -668,6 +724,7 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) nxt_http_request_t *r; r = ctx; + field->hopbyhop = 1; if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { r->proto.h1->keepalive = 0; @@ -738,6 +795,8 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data) nxt_http_request_t *r; r = ctx; + field->skip = 1; + field->hopbyhop = 1; if (field->value_length == 7 && nxt_memcmp(field->value, "chunked", 7) == 0) @@ -1003,7 +1062,7 @@ static const nxt_str_t nxt_http_server_error[] = { static void nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler) + nxt_work_handler_t body_handler, void *data) { u_char *p; size_t size; @@ -1180,7 +1239,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, * in engine->write_work_queue. */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, - body_handler, task, r, NULL); + body_handler, task, r, data); } else { header->next = nxt_http_buf_last(r); @@ -1784,14 +1843,31 @@ nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c) } } else { - nxt_h1p_shutdown_(task, c); + nxt_h1p_closing(task, c); } } static void -nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c) +nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws shutdown"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + + nxt_h1p_closing(task, ws_timer->h1p->conn); +} + + +static void +nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c) { + nxt_debug(task, "h1p closing"); + c->socket.data = NULL; #if (NXT_TLS) @@ -1823,21 +1899,6 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state static void -nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *timer; - nxt_h1p_websocket_timer_t *ws_timer; - - nxt_debug(task, "h1p conn ws shutdown"); - - timer = obj; - ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); - - nxt_h1p_shutdown_(task, ws_timer->h1p->conn); -} - - -static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; @@ -1882,3 +1943,673 @@ nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data) nxt_router_listen_event_release(&engine->task, lev, NULL); } + + +static void +nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_conn_t *c, *client; + nxt_h1proto_t *h1p; + nxt_fd_event_t *socket; + nxt_work_queue_t *wq; + nxt_http_request_t *r; + + nxt_debug(task, "h1p peer connect"); + + peer->status = NXT_HTTP_UNSET; + r = peer->request; + + mp = nxt_mp_create(1024, 128, 256, 32); + + if (nxt_slow_path(mp == NULL)) { + goto fail; + } + + h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t)); + if (nxt_slow_path(h1p == NULL)) { + goto fail; + } + + ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + c = nxt_conn_create(mp, task); + if (nxt_slow_path(c == NULL)) { + goto fail; + } + + c->mem_pool = mp; + h1p->conn = c; + + peer->proto.h1 = h1p; + h1p->request = r; + + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + c->socket.data = peer; + c->remote = peer->sockaddr; + + c->socket.write_ready = 1; + c->write_state = &nxt_h1p_peer_connect_state; + + /* + * TODO: queues should be implemented via client proto interface. + */ + client = r->proto.h1->conn; + + socket = &client->socket; + wq = socket->read_work_queue; + c->read_work_queue = wq; + c->socket.read_work_queue = wq; + c->read_timer.work_queue = wq; + + wq = socket->write_work_queue; + c->write_work_queue = wq; + c->socket.write_work_queue = wq; + c->write_timer.work_queue = wq; + /* TODO END */ + + nxt_conn_connect(task->thread->engine, c); + + return; + +fail: + + peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR; + + r->state->error_handler(task, r, peer); +} + + +static const nxt_conn_state_t nxt_h1p_peer_connect_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_connected, + .close_handler = nxt_h1p_peer_refused, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static void +nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer connected"); + + r = peer->request; + r->state->ready_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer refused"); + + //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE; + peer->status = NXT_HTTP_BAD_GATEWAY; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer) +{ + u_char *p; + size_t size; + nxt_buf_t *header, *body; + nxt_conn_t *c; + nxt_http_field_t *field; + nxt_http_request_t *r; + + nxt_debug(task, "h1p peer header send"); + + r = peer->request; + + size = r->method->length + sizeof(" ") + r->target.length + + sizeof(" HTTP/1.0\r\n") + + sizeof("\r\n"); + + nxt_list_each(field, r->fields) { + + if (!field->hopbyhop) { + size += field->name_length + field->value_length; + size += nxt_length(": \r\n"); + } + + } nxt_list_loop; + + header = nxt_http_buf_mem(task, r, size); + if (nxt_slow_path(header == NULL)) { + r->state->error_handler(task, r, peer); + return; + } + + p = header->mem.free; + + p = nxt_cpymem(p, r->method->start, r->method->length); + *p++ = ' '; + p = nxt_cpymem(p, r->target.start, r->target.length); + p = nxt_cpymem(p, " HTTP/1.0\r\n", 11); + + nxt_list_each(field, r->fields) { + + if (!field->hopbyhop) { + p = nxt_cpymem(p, field->name, field->name_length); + *p++ = ':'; *p++ = ' '; + p = nxt_cpymem(p, field->value, field->value_length); + *p++ = '\r'; *p++ = '\n'; + } + + } nxt_list_loop; + + *p++ = '\r'; *p++ = '\n'; + header->mem.free = p; + size = p - header->mem.pos; + + c = peer->proto.h1->conn; + c->write = header; + c->write_state = &nxt_h1p_peer_header_send_state; + + if (r->body != NULL) { + body = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + if (nxt_slow_path(body == NULL)) { + r->state->error_handler(task, r, peer); + return; + } + + header->next = body; + + body->mem = r->body->mem; + size += nxt_buf_mem_used_size(&body->mem); + +// nxt_mp_retain(r->mem_pool); + } + + if (size > 16384) { + /* Use proxy_send_timeout instead of proxy_timeout. */ + c->write_state = &nxt_h1p_peer_header_body_send_state; + } + + nxt_conn_write(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_header_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_sent, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_sent, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer header sent"); + + engine = task->thread->engine; + + c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write); + + if (c->write == NULL) { + r = peer->request; + r->state->ready_handler(task, r, peer); + return; + } + + nxt_conn_write(engine, c); +} + + +static void +nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer header read"); + + c = peer->proto.h1->conn; + + if (c->write_timer.enabled) { + c->read_state = &nxt_h1p_peer_header_read_state; + + } else { + c->read_state = &nxt_h1p_peer_header_read_timer_state; + } + + nxt_conn_read(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_header_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, +}; + + +static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, + + .timer_handler = nxt_h1p_peer_read_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static ssize_t +nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c) +{ + size_t size; + ssize_t n; + nxt_buf_t *b; + nxt_http_peer_t *peer; + nxt_socket_conf_t *skcf; + nxt_http_request_t *r; + + peer = c->socket.data; + r = peer->request; + b = c->read; + + if (b == NULL) { + skcf = r->conf->socket_conf; + + size = (peer->header_received) ? skcf->proxy_buffer_size + : skcf->proxy_header_buffer_size; + + nxt_debug(task, "h1p peer io read: %z", size); + + b = nxt_http_proxy_buf_mem_alloc(task, r, size); + if (nxt_slow_path(b == NULL)) { + c->socket.error = NXT_ENOMEM; + return NXT_ERROR; + } + } + + n = c->io->recvbuf(c, b); + + if (n > 0) { + c->read = b; + + } else { + c->read = NULL; + nxt_http_proxy_buf_mem_free(task, r, b); + } + + return n; +} + + +static void +nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t ret; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer header read done"); + + b = c->read; + + ret = nxt_h1p_peer_header_parse(peer, &b->mem); + + r = peer->request; + + ret = nxt_expect(NXT_DONE, ret); + + if (ret != NXT_AGAIN) { + engine = task->thread->engine; + nxt_timer_disable(engine, &c->write_timer); + nxt_timer_disable(engine, &c->read_timer); + } + + switch (ret) { + + case NXT_DONE: + peer->fields = peer->proto.h1->parser.fields; + + ret = nxt_http_fields_process(peer->fields, + &nxt_h1p_peer_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR; + break; + } + + c->read = NULL; + + if (nxt_buf_mem_used_size(&b->mem) != 0) { + peer->body = b; + } + + peer->header_received = 1; + + r->state->ready_handler(task, r, peer); + return; + + case NXT_AGAIN: + if (nxt_buf_mem_free_size(&b->mem) != 0) { + nxt_conn_read(task->thread->engine, c); + return; + } + + /* Fall through. */ + + default: + case NXT_ERROR: + case NXT_HTTP_PARSE_INVALID: + case NXT_HTTP_PARSE_UNSUPPORTED_VERSION: + case NXT_HTTP_PARSE_TOO_LARGE_FIELD: + peer->status = NXT_HTTP_BAD_GATEWAY; + break; + } + + nxt_http_proxy_buf_mem_free(task, r, b); + + r->state->error_handler(task, r, peer); +} + + +static nxt_int_t +nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm) +{ + u_char *p; + size_t length; + nxt_int_t status; + + if (peer->status < 0) { + length = nxt_buf_mem_used_size(bm); + + if (nxt_slow_path(length < 12)) { + return NXT_AGAIN; + } + + p = bm->pos; + + if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0 + || (p[7] != '0' && p[7] != '1'))) + { + return NXT_ERROR; + } + + status = nxt_int_parse(&p[9], 3); + + if (nxt_slow_path(status < 0)) { + return NXT_ERROR; + } + + p += 12; + length -= 12; + + p = nxt_memchr(p, '\n', length); + + if (nxt_slow_path(p == NULL)) { + return NXT_AGAIN; + } + + bm->pos = p + 1; + peer->status = status; + } + + return nxt_http_parse_fields(&peer->proto.h1->parser, bm); +} + + +static void +nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer read"); + + c = peer->proto.h1->conn; + c->read_state = &nxt_h1p_peer_read_state; + + nxt_conn_read(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, + + .timer_handler = nxt_h1p_peer_read_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer read done"); + + peer->body = c->read; + c->read = NULL; + + r = peer->request; + r->state->ready_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer closed"); + + r = peer->request; + + if (peer->header_received) { + peer->body = nxt_http_buf_last(r); + + peer->closed = 1; + + r->state->ready_handler(task, r, peer); + + } else { + peer->status = NXT_HTTP_BAD_GATEWAY; + + r->state->error_handler(task, r, peer); + } +} + + +static void +nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer error"); + + peer->status = NXT_HTTP_BAD_GATEWAY; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p peer send timeout"); + + c = nxt_write_timer_conn(timer); + c->block_write = 1; + c->block_read = 1; + + peer = c->socket.data; + peer->status = NXT_HTTP_GATEWAY_TIMEOUT; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p peer read timeout"); + + c = nxt_read_timer_conn(timer); + c->block_write = 1; + c->block_read = 1; + + peer = c->socket.data; + peer->status = NXT_HTTP_GATEWAY_TIMEOUT; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static nxt_msec_t +nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_http_peer_t *peer; + + peer = c->socket.data; + + return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data); +} + + +static void +nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer close"); + + peer->closed = 1; + + c = peer->proto.h1->conn; + task = &c->task; + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + + if (c->socket.fd != -1) { + c->write_state = &nxt_h1p_peer_close_state; + + nxt_conn_close(task->thread->engine, c); + + } else { + nxt_h1p_peer_free(task, c, NULL); + } +} + + +static const nxt_conn_state_t nxt_h1p_peer_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_free, +}; + + +static void +nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + + c = obj; + + nxt_debug(task, "h1p peer free"); + + nxt_conn_free(task, c); +} diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h index c6d3bd53..61da6770 100644 --- a/src/nxt_h1proto.h +++ b/src/nxt_h1proto.h @@ -20,6 +20,8 @@ struct nxt_h1proto_s { nxt_http_request_parse_t parser; uint8_t nbuffers; + uint8_t header_buffer_slot; + uint8_t large_buffer_slot; uint8_t keepalive; /* 1 bit */ uint8_t chunked; /* 1 bit */ uint8_t websocket; /* 1 bit */ diff --git a/src/nxt_http.h b/src/nxt_http.h index bbfaa33e..784b12c4 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -9,6 +9,7 @@ typedef enum { + NXT_HTTP_UNSET = -1, NXT_HTTP_INVALID = 0, NXT_HTTP_CONTINUE = 100, @@ -105,6 +106,20 @@ typedef struct { } nxt_http_response_t; +typedef struct { + nxt_http_proto_t proto; + nxt_http_request_t *request; + nxt_sockaddr_t *sockaddr; + nxt_list_t *fields; + nxt_buf_t *body; + + nxt_http_status_t status:16; + nxt_http_protocol_t protocol:8; /* 2 bits */ + uint8_t header_received; /* 1 bit */ + uint8_t closed; /* 1 bit */ +} nxt_http_peer_t; + + struct nxt_http_request_s { nxt_http_proto_t proto; nxt_socket_conf_joint_t *conf; @@ -144,6 +159,7 @@ struct nxt_http_request_s { void *req_rpc_data; + nxt_http_peer_t *peer; nxt_buf_t *last; nxt_http_response_t resp; @@ -160,6 +176,7 @@ struct nxt_http_request_s { typedef struct nxt_http_route_s nxt_http_route_t; +typedef struct nxt_http_upstream_s nxt_http_upstream_t; struct nxt_http_action_s { @@ -168,6 +185,7 @@ struct nxt_http_action_s { nxt_http_action_t *action); union { nxt_http_route_t *route; + nxt_http_upstream_t *upstream; nxt_app_t *application; } u; @@ -179,12 +197,19 @@ typedef struct { void (*body_read)(nxt_task_t *task, nxt_http_request_t *r); void (*local_addr)(nxt_task_t *task, nxt_http_request_t *r); void (*header_send)(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler); + nxt_work_handler_t body_handler, void *data); void (*send)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); nxt_off_t (*body_bytes_sent)(nxt_task_t *task, nxt_http_proto_t proto); void (*discard)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last); void (*close)(nxt_task_t *task, nxt_http_proto_t proto, nxt_socket_conf_joint_t *joint); + + void (*peer_connect)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_header_send)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_header_read)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_read)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_close)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*ws_frame_start)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *ws_frame); } nxt_http_proto_table_t; @@ -219,7 +244,7 @@ void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, nxt_http_status_t status); void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r); void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler); + nxt_work_handler_t body_handler, void *data); void nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *ws_frame); void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, @@ -258,6 +283,18 @@ nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash, nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); +nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action); +nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_int_t nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_int_t nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_buf_t *nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, + size_t size); +void nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *b); + extern nxt_time_string_t nxt_http_date_cache; extern nxt_lvlhsh_t nxt_response_fields_hash; diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c index 8e8b80f1..370b12db 100644 --- a/src/nxt_http_error.c +++ b/src/nxt_http_error.c @@ -57,8 +57,8 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, r->state = &nxt_http_request_send_error_body_state; - nxt_http_request_header_send(task, r, nxt_http_request_send_error_body); - + nxt_http_request_header_send(task, r, + nxt_http_request_send_error_body, NULL); return; fail: diff --git a/src/nxt_http_parse.c b/src/nxt_http_parse.c index e6e91454..4c5d4936 100644 --- a/src/nxt_http_parse.c +++ b/src/nxt_http_parse.c @@ -787,6 +787,7 @@ nxt_http_parse_field_end(nxt_http_request_parse_t *rp, u_char **pos, field->hash = nxt_http_field_hash_end(rp->field_hash); field->skip = 0; + field->hopbyhop = 0; field->name_length = rp->field_name.length; field->value_length = rp->field_value.length; diff --git a/src/nxt_http_parse.h b/src/nxt_http_parse.h index d7ce5e4f..d319c71d 100644 --- a/src/nxt_http_parse.h +++ b/src/nxt_http_parse.h @@ -81,7 +81,8 @@ typedef struct { struct nxt_http_field_s { uint16_t hash; - uint8_t skip; /* 1 bit */ + uint8_t skip:1; + uint8_t hopbyhop:1; uint8_t name_length; uint32_t value_length; u_char *name; diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c new file mode 100644 index 00000000..94856684 --- /dev/null +++ b/src/nxt_http_proxy.c @@ -0,0 +1,380 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_router.h> +#include <nxt_http.h> + + +typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, + nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); + + +struct nxt_http_upstream_s { + uint32_t current; + uint32_t n; + uint8_t protocol; + nxt_http_upstream_connect_t connect; + nxt_sockaddr_t *sockaddr[1]; +}; + + +static void nxt_http_upstream_connect(nxt_task_t *task, + nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); +static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); +static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, + void *data); +static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); + + +static const nxt_http_request_state_t nxt_http_proxy_header_send_state; +static const nxt_http_request_state_t nxt_http_proxy_header_sent_state; +static const nxt_http_request_state_t nxt_http_proxy_header_read_state; +static const nxt_http_request_state_t nxt_http_proxy_read_state; + + +nxt_int_t +nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) +{ + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_http_upstream_t *upstream; + + sa = NULL; + name = action->name; + + if (nxt_str_start(&name, "http://", 7)) { + name.length -= 7; + name.start += 7; + + sa = nxt_sockaddr_parse(mp, &name); + if (nxt_slow_path(sa == NULL)) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + } + + if (sa != NULL) { + upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); + if (nxt_slow_path(upstream == NULL)) { + return NXT_ERROR; + } + + upstream->current = 0; + upstream->n = 1; + upstream->protocol = NXT_HTTP_PROTO_H1; + upstream->connect = nxt_http_upstream_connect; + upstream->sockaddr[0] = sa; + + action->u.upstream = upstream; + action->handler = nxt_http_proxy_handler; + } + + return NXT_OK; +} + + +static nxt_http_action_t * +nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *action) +{ + nxt_http_peer_t *peer; + + peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); + if (nxt_slow_path(peer == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return NULL; + } + + peer->request = r; + r->peer = peer; + + nxt_mp_retain(r->mem_pool); + + action->u.upstream->connect(task, action->u.upstream, peer); + + return NULL; +} + + +static void +nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, + nxt_http_peer_t *peer) +{ + peer->protocol = upstream->protocol; + peer->sockaddr = upstream->sockaddr[0]; + + peer->request->state = &nxt_http_proxy_header_send_state; + + nxt_http_proto[peer->protocol].peer_connect(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_send, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + r->state = &nxt_http_proxy_header_sent_state; + + nxt_http_proto[peer->protocol].peer_header_send(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_sent_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_sent, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + r->state = &nxt_http_proxy_header_read_state; + + nxt_http_proto[peer->protocol].peer_header_read(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_read, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_field_t *f, *field; + nxt_http_request_t *r; + + r = obj; + peer = data; + + r->status = peer->status; + + nxt_debug(task, "http proxy status: %d", peer->status); + + nxt_list_each(field, peer->fields) { + + nxt_debug(task, "http proxy header: \"%*s: %*s\"", + (size_t) field->name_length, field->name, + (size_t) field->value_length, field->value); + + if (!field->skip) { + f = nxt_list_add(r->resp.fields); + if (nxt_slow_path(f == NULL)) { + nxt_http_proxy_error(task, r, peer); + return; + } + + *f = *field; + } + + } nxt_list_loop; + + nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); +} + + +static void +nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + out = peer->body; + + if (out != NULL) { + peer->body = NULL; + nxt_http_request_send(task, r, out); + } + + r->state = &nxt_http_proxy_read_state; + + nxt_http_proto[peer->protocol].peer_read(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_read, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_bool_t last; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + out = peer->body; + peer->body = NULL; + last = nxt_buf_is_last(out); + + nxt_http_request_send(task, r, out); + + if (!last) { + nxt_http_proto[peer->protocol].peer_read(task, peer); + + } else { + nxt_http_proto[peer->protocol].peer_close(task, peer); + + nxt_mp_release(r->mem_pool); + } +} + + +nxt_buf_t * +nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, + size_t size) +{ + nxt_buf_t *b; + + b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); + if (nxt_fast_path(b != NULL)) { + b->completion_handler = nxt_http_proxy_buf_mem_completion; + b->parent = r; + nxt_mp_retain(r->mem_pool); + + } else { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + } + + return b; +} + + +static void +nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b, *next; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + b = obj; + r = data; + + peer = r->peer; + + do { + next = b->next; + + nxt_http_proxy_buf_mem_free(task, r, b); + + b = next; + } while (b != NULL); + + if (!peer->closed) { + nxt_http_proto[peer->protocol].peer_read(task, peer); + } +} + + +void +nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *b) +{ + nxt_event_engine_buf_mem_free(task->thread->engine, b); + + nxt_mp_release(r->mem_pool); +} + + +static void +nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = r->peer; + + nxt_http_proto[peer->protocol].peer_close(task, peer); + + nxt_mp_release(r->mem_pool); + + nxt_http_request_error(task, r, peer->status); +} + + +nxt_int_t +nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + r->resp.date = field; + + return NXT_OK; +} + + +nxt_int_t +nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data) +{ + nxt_off_t n; + nxt_http_request_t *r; + + r = ctx; + + r->resp.content_length = field; + + n = nxt_off_t_parse(field->value, field->value_length); + + if (nxt_fast_path(n >= 0)) { + r->resp.content_length_n = n; + } + + return NXT_OK; +} + + +nxt_int_t +nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + field->skip = 1; + + return NXT_OK; +} diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index d40393b7..14c75dab 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -370,7 +370,7 @@ nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r) void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler) + nxt_work_handler_t body_handler, void *data) { u_char *p, *end; nxt_http_field_t *server, *date, *content_length; @@ -431,7 +431,7 @@ nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, } if (nxt_fast_path(r->proto.any != NULL)) { - nxt_http_proto[r->protocol].header_send(task, r, body_handler); + nxt_http_proto[r->protocol].header_send(task, r, body_handler, data); } return; diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index 6104379e..18b352ea 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -38,6 +38,7 @@ typedef enum { typedef struct { nxt_conf_value_t *pass; nxt_conf_value_t *share; + nxt_conf_value_t *proxy; } nxt_http_route_action_conf_t; @@ -519,6 +520,11 @@ static nxt_conf_map_t nxt_http_route_action_conf[] = { NXT_CONF_MAP_PTR, offsetof(nxt_http_route_action_conf_t, share) }, + { + nxt_string("proxy"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, proxy) + }, }; @@ -526,6 +532,7 @@ static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv, nxt_http_route_match_t *match) { + nxt_mp_t *mp; nxt_int_t ret; nxt_str_t name, *string; nxt_conf_value_t *conf, *action_conf; @@ -552,16 +559,24 @@ nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv, if (accf.share != NULL) { conf = accf.share; match->action.handler = nxt_http_static_handler; + + } else if (accf.proxy != NULL) { + conf = accf.proxy; } nxt_conf_get_string(conf, &name); - string = nxt_str_dup(tmcf->router_conf->mem_pool, - &match->action.name, &name); + mp = tmcf->router_conf->mem_pool; + + string = nxt_str_dup(mp, &match->action.name, &name); if (nxt_slow_path(string == NULL)) { return NXT_ERROR; } + if (accf.proxy != NULL) { + return nxt_http_proxy_create(mp, &match->action); + } + return NXT_OK; } @@ -1013,6 +1028,7 @@ nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } action->name = *name; + action->handler = NULL; nxt_http_action_resolve(task, tmcf, action); diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index ba5e9160..44132859 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -272,7 +272,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, body_handler = NULL; } - nxt_http_request_header_send(task, r, body_handler); + nxt_http_request_header_send(task, r, body_handler, NULL); r->state = &nxt_http_static_send_state; return NULL; diff --git a/src/nxt_router.c b/src/nxt_router.c index 60ee77e5..b9f5d921 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1678,10 +1678,16 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->large_header_buffers = 4; skcf->body_buffer_size = 16 * 1024; skcf->max_body_size = 8 * 1024 * 1024; + skcf->proxy_header_buffer_size = 64 * 1024; + skcf->proxy_buffer_size = 4096; + skcf->proxy_buffers = 256; skcf->idle_timeout = 180 * 1000; skcf->header_read_timeout = 30 * 1000; skcf->body_read_timeout = 30 * 1000; skcf->send_timeout = 30 * 1000; + skcf->proxy_timeout = 60 * 1000; + skcf->proxy_send_timeout = 30 * 1000; + skcf->proxy_read_timeout = 30 * 1000; skcf->websocket_conf.max_frame_size = 1024 * 1024; skcf->websocket_conf.read_timeout = 60 * 1000; @@ -3582,6 +3588,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, field->hash = f->hash; field->skip = 0; + field->hopbyhop = 0; field->name_length = f->name_length; field->value_length = f->value_length; @@ -3627,7 +3634,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&r->out, b); } - nxt_http_request_header_send(task, r, nxt_http_request_send_body); + nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); if (r->websocket_handshake && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) diff --git a/src/nxt_router.h b/src/nxt_router.h index 348048e9..1517c14b 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -170,10 +170,17 @@ typedef struct { size_t large_header_buffers; size_t body_buffer_size; size_t max_body_size; + size_t proxy_header_buffer_size; + size_t proxy_buffer_size; + size_t proxy_buffers; + nxt_msec_t idle_timeout; nxt_msec_t header_read_timeout; nxt_msec_t body_read_timeout; nxt_msec_t send_timeout; + nxt_msec_t proxy_timeout; + nxt_msec_t proxy_send_timeout; + nxt_msec_t proxy_read_timeout; nxt_websocket_conf_t websocket_conf; diff --git a/src/nxt_socket.c b/src/nxt_socket.c index a89663b1..2a809184 100644 --- a/src/nxt_socket.c +++ b/src/nxt_socket.c @@ -337,6 +337,9 @@ nxt_socket_error_level(nxt_err_t err) case NXT_EHOSTUNREACH: return NXT_LOG_INFO; + case NXT_ECONNREFUSED: + return NXT_LOG_ERR; + default: return NXT_LOG_ALERT; } diff --git a/src/nxt_unit_field.h b/src/nxt_unit_field.h index d19db0f0..b07d3046 100644 --- a/src/nxt_unit_field.h +++ b/src/nxt_unit_field.h @@ -21,7 +21,8 @@ enum { /* Name and Value field aka HTTP header. */ struct nxt_unit_field_s { uint16_t hash; - uint8_t skip; + uint8_t skip:1; + uint8_t hopbyhop:1; uint8_t name_length; uint32_t value_length; |