summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_h1proto.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_h1proto.c840
1 files changed, 793 insertions, 47 deletions
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index 541fcb44..b07eaf84 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -18,10 +18,10 @@
*/
#if (NXT_TLS)
-static ssize_t nxt_http_idle_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
#endif
-static ssize_t nxt_h1p_idle_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
@@ -45,7 +45,7 @@ static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
static void nxt_h1p_request_header_send(nxt_task_t *task,
- nxt_http_request_t *r, nxt_work_handler_t body_handler);
+ nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
@@ -78,11 +78,32 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
uintptr_t data);
static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
-static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
+static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
+ void *data);
+static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
+ nxt_buf_mem_t *bm);
+static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
+static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
+static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
+
#if (NXT_TLS)
static const nxt_conn_state_t nxt_http_idle_state;
static const nxt_conn_state_t nxt_h1p_shutdown_state;
@@ -94,6 +115,13 @@ static const nxt_conn_state_t nxt_h1p_request_send_state;
static const nxt_conn_state_t nxt_h1p_timeout_response_state;
static const nxt_conn_state_t nxt_h1p_keepalive_state;
static const nxt_conn_state_t nxt_h1p_close_state;
+static const nxt_conn_state_t nxt_h1p_peer_connect_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_send_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_read_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state;
+static const nxt_conn_state_t nxt_h1p_peer_read_state;
+static const nxt_conn_state_t nxt_h1p_peer_close_state;
const nxt_http_proto_table_t nxt_http_proto[3] = {
@@ -106,6 +134,13 @@ const nxt_http_proto_table_t nxt_http_proto[3] = {
.body_bytes_sent = nxt_h1p_request_body_bytes_sent,
.discard = nxt_h1p_request_discard,
.close = nxt_h1p_request_close,
+
+ .peer_connect = nxt_h1p_peer_connect,
+ .peer_header_send = nxt_h1p_peer_header_send,
+ .peer_header_read = nxt_h1p_peer_header_read,
+ .peer_read = nxt_h1p_peer_read,
+ .peer_close = nxt_h1p_peer_close,
+
.ws_frame_start = nxt_h1p_websocket_frame_start,
},
/* NXT_HTTP_PROTO_H2 */
@@ -113,9 +148,9 @@ const nxt_http_proto_table_t nxt_http_proto[3] = {
};
-static nxt_lvlhsh_t nxt_h1p_fields_hash;
+static nxt_lvlhsh_t nxt_h1p_fields_hash;
-static nxt_http_field_proc_t nxt_h1p_fields[] = {
+static nxt_http_field_proc_t nxt_h1p_fields[] = {
{ nxt_string("Connection"), &nxt_h1p_connection, 0 },
{ nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 },
{ nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
@@ -136,11 +171,32 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = {
};
+static nxt_lvlhsh_t nxt_h1p_peer_fields_hash;
+
+static nxt_http_field_proc_t nxt_h1p_peer_fields[] = {
+ { nxt_string("Connection"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Server"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Date"), &nxt_http_proxy_date, 0 },
+ { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 },
+};
+
+
nxt_int_t
nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt)
{
- return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
- nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
+ nxt_int_t ret;
+
+ ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
+ nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
+ rt->mem_pool, nxt_h1p_peer_fields,
+ nxt_nitems(nxt_h1p_peer_fields));
+ }
+
+ return ret;
}
@@ -196,7 +252,7 @@ static const nxt_conn_state_t nxt_http_idle_state
static ssize_t
-nxt_http_idle_io_read_handler(nxt_conn_t *c)
+nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -216,7 +272,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c)
size = joint->socket_conf->header_buffer_size;
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
if (nxt_slow_path(b == NULL)) {
c->socket.error = NXT_ENOMEM;
return NXT_ERROR;
@@ -234,7 +290,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c)
} else {
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
}
return n;
@@ -248,12 +304,14 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
nxt_buf_t *b;
nxt_conn_t *c;
nxt_tls_conf_t *tls;
+ nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;
c = obj;
nxt_debug(task, "h1p conn https test");
+ engine = task->thread->engine;
b = c->read;
p = b->mem.pos;
@@ -262,7 +320,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
if (p[0] != 0x16) {
b->mem.free = b->mem.pos;
- nxt_conn_read(task->thread->engine, c);
+ nxt_conn_read(engine, c);
return;
}
@@ -292,7 +350,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
#endif
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(engine, b);
joint = c->listen->socket.data;
@@ -301,7 +359,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
* Listening socket had been closed while
* connection was in keep-alive state.
*/
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
return;
}
@@ -330,7 +388,7 @@ static const nxt_conn_state_t nxt_h1p_idle_state
static ssize_t
-nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
+nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -353,7 +411,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
if (b == NULL) {
size = joint->socket_conf->header_buffer_size;
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
if (nxt_slow_path(b == NULL)) {
c->socket.error = NXT_ENOMEM;
return NXT_ERROR;
@@ -367,7 +425,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
} else {
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
}
return n;
@@ -386,7 +444,7 @@ nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
if (nxt_slow_path(h1p == NULL)) {
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
return;
}
@@ -424,6 +482,12 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
r->tls = c->u.tls;
#endif
+ r->task = c->task;
+ task = &r->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+
ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
if (nxt_fast_path(ret == NXT_OK)) {
@@ -444,7 +508,7 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
nxt_mp_release(r->mem_pool);
}
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
}
@@ -599,13 +663,15 @@ nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
}
if (nxt_slow_path(h1p->websocket_key == NULL)) {
- nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key");
+ nxt_log(task, NXT_LOG_INFO,
+ "h1p upgrade: bad or absent websocket key");
return NXT_HTTP_BAD_REQUEST;
}
if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
- nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version");
+ nxt_log(task, NXT_LOG_INFO,
+ "h1p upgrade: bad or absent websocket version");
return NXT_HTTP_UPGRADE_REQUIRED;
}
@@ -655,16 +721,16 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
static nxt_int_t
nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
{
- nxt_http_request_t *r;
- static const u_char *upgrade = (const u_char *) "upgrade";
+ nxt_http_request_t *r;
r = ctx;
+ field->hopbyhop = 1;
if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
r->proto.h1->keepalive = 0;
} else if (field->value_length == 7
- && nxt_memcasecmp(field->value, upgrade, 7) == 0)
+ && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
{
r->proto.h1->connection_upgrade = 1;
}
@@ -676,13 +742,12 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
static nxt_int_t
nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
{
- nxt_http_request_t *r;
- static const u_char *websocket = (const u_char *) "websocket";
+ nxt_http_request_t *r;
r = ctx;
if (field->value_length == 9
- && nxt_memcasecmp(field->value, websocket, 9) == 0)
+ && nxt_memcasecmp(field->value, "websocket", 9) == 0)
{
r->proto.h1->upgrade_websocket = 1;
}
@@ -730,6 +795,8 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
nxt_http_request_t *r;
r = ctx;
+ field->skip = 1;
+ field->hopbyhop = 1;
if (field->value_length == 7
&& nxt_memcmp(field->value, "chunked", 7) == 0)
@@ -995,7 +1062,7 @@ static const nxt_str_t nxt_http_server_error[] = {
static void
nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler)
+ nxt_work_handler_t body_handler, void *data)
{
u_char *p;
size_t size;
@@ -1172,7 +1239,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
* in engine->write_work_queue.
*/
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- body_handler, task, r, NULL);
+ body_handler, task, r, data);
} else {
header->next = nxt_http_buf_last(r);
@@ -1211,6 +1278,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
while (b != NULL) {
next = b->next;
+ b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
@@ -1226,7 +1294,8 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
size = nxt_buf_mem_used_size(&in->mem);
if (size == 0) {
- nxt_mp_free(c->mem_pool, in);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ in->completion_handler, task, in, in->parent);
c->read = NULL;
}
@@ -1480,11 +1549,16 @@ nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
nxt_debug(task, "h1p request close");
h1p = proto.h1;
+ h1p->keepalive &= !h1p->request->inconsistent;
h1p->request = NULL;
nxt_router_conf_release(task, joint);
c = h1p->conn;
+ task = &c->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
if (h1p->keepalive) {
nxt_h1p_keepalive(task, h1p, c);
@@ -1770,14 +1844,31 @@ nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
}
} else {
- nxt_h1p_shutdown_(task, c);
+ nxt_h1p_closing(task, c);
}
}
static void
-nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c)
+nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
{
+ nxt_timer_t *timer;
+ nxt_h1p_websocket_timer_t *ws_timer;
+
+ nxt_debug(task, "h1p conn ws shutdown");
+
+ timer = obj;
+ ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
+
+ nxt_h1p_closing(task, ws_timer->h1p->conn);
+}
+
+
+static void
+nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
+{
+ nxt_debug(task, "h1p closing");
+
c->socket.data = NULL;
#if (NXT_TLS)
@@ -1809,21 +1900,6 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state
static void
-nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *timer;
- nxt_h1p_websocket_timer_t *ws_timer;
-
- nxt_debug(task, "h1p conn ws shutdown");
-
- timer = obj;
- ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
-
- nxt_h1p_shutdown_(task, ws_timer->h1p->conn);
-}
-
-
-static void
nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
@@ -1868,3 +1944,673 @@ nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_router_listen_event_release(&engine->task, lev, NULL);
}
+
+
+static void
+nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_conn_t *c, *client;
+ nxt_h1proto_t *h1p;
+ nxt_fd_event_t *socket;
+ nxt_work_queue_t *wq;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "h1p peer connect");
+
+ peer->status = NXT_HTTP_UNSET;
+ r = peer->request;
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+
+ if (nxt_slow_path(mp == NULL)) {
+ goto fail;
+ }
+
+ h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
+ if (nxt_slow_path(h1p == NULL)) {
+ goto fail;
+ }
+
+ ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
+ c = nxt_conn_create(mp, task);
+ if (nxt_slow_path(c == NULL)) {
+ goto fail;
+ }
+
+ c->mem_pool = mp;
+ h1p->conn = c;
+
+ peer->proto.h1 = h1p;
+ h1p->request = r;
+
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+ c->socket.data = peer;
+ c->remote = peer->sockaddr;
+
+ c->socket.write_ready = 1;
+ c->write_state = &nxt_h1p_peer_connect_state;
+
+ /*
+ * TODO: queues should be implemented via client proto interface.
+ */
+ client = r->proto.h1->conn;
+
+ socket = &client->socket;
+ wq = socket->read_work_queue;
+ c->read_work_queue = wq;
+ c->socket.read_work_queue = wq;
+ c->read_timer.work_queue = wq;
+
+ wq = socket->write_work_queue;
+ c->write_work_queue = wq;
+ c->socket.write_work_queue = wq;
+ c->write_timer.work_queue = wq;
+ /* TODO END */
+
+ nxt_conn_connect(task->thread->engine, c);
+
+ return;
+
+fail:
+
+ peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
+
+ r->state->error_handler(task, r, peer);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_connect_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_connected,
+ .close_handler = nxt_h1p_peer_refused,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static void
+nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer connected");
+
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer refused");
+
+ //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ u_char *p;
+ size_t size;
+ nxt_buf_t *header, *body;
+ nxt_conn_t *c;
+ nxt_http_field_t *field;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "h1p peer header send");
+
+ r = peer->request;
+
+ size = r->method->length + sizeof(" ") + r->target.length
+ + sizeof(" HTTP/1.0\r\n")
+ + sizeof("\r\n");
+
+ nxt_list_each(field, r->fields) {
+
+ if (!field->hopbyhop) {
+ size += field->name_length + field->value_length;
+ size += nxt_length(": \r\n");
+ }
+
+ } nxt_list_loop;
+
+ header = nxt_http_buf_mem(task, r, size);
+ if (nxt_slow_path(header == NULL)) {
+ r->state->error_handler(task, r, peer);
+ return;
+ }
+
+ p = header->mem.free;
+
+ p = nxt_cpymem(p, r->method->start, r->method->length);
+ *p++ = ' ';
+ p = nxt_cpymem(p, r->target.start, r->target.length);
+ p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
+
+ nxt_list_each(field, r->fields) {
+
+ if (!field->hopbyhop) {
+ p = nxt_cpymem(p, field->name, field->name_length);
+ *p++ = ':'; *p++ = ' ';
+ p = nxt_cpymem(p, field->value, field->value_length);
+ *p++ = '\r'; *p++ = '\n';
+ }
+
+ } nxt_list_loop;
+
+ *p++ = '\r'; *p++ = '\n';
+ header->mem.free = p;
+ size = p - header->mem.pos;
+
+ c = peer->proto.h1->conn;
+ c->write = header;
+ c->write_state = &nxt_h1p_peer_header_send_state;
+
+ if (r->body != NULL) {
+ body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
+ if (nxt_slow_path(body == NULL)) {
+ r->state->error_handler(task, r, peer);
+ return;
+ }
+
+ header->next = body;
+
+ body->mem = r->body->mem;
+ size += nxt_buf_mem_used_size(&body->mem);
+
+// nxt_mp_retain(r->mem_pool);
+ }
+
+ if (size > 16384) {
+ /* Use proxy_send_timeout instead of proxy_timeout. */
+ c->write_state = &nxt_h1p_peer_header_body_send_state;
+ }
+
+ nxt_conn_write(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_sent,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_sent,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer header sent");
+
+ engine = task->thread->engine;
+
+ c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
+
+ if (c->write == NULL) {
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+ return;
+ }
+
+ nxt_conn_write(engine, c);
+}
+
+
+static void
+nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer header read");
+
+ c = peer->proto.h1->conn;
+
+ if (c->write_timer.enabled) {
+ c->read_state = &nxt_h1p_peer_header_read_state;
+
+ } else {
+ c->read_state = &nxt_h1p_peer_header_read_timer_state;
+ }
+
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+};
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+
+ .timer_handler = nxt_h1p_peer_read_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static ssize_t
+nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
+{
+ size_t size;
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_http_peer_t *peer;
+ nxt_socket_conf_t *skcf;
+ nxt_http_request_t *r;
+
+ peer = c->socket.data;
+ r = peer->request;
+ b = c->read;
+
+ if (b == NULL) {
+ skcf = r->conf->socket_conf;
+
+ size = (peer->header_received) ? skcf->proxy_buffer_size
+ : skcf->proxy_header_buffer_size;
+
+ nxt_debug(task, "h1p peer io read: %z", size);
+
+ b = nxt_http_proxy_buf_mem_alloc(task, r, size);
+ if (nxt_slow_path(b == NULL)) {
+ c->socket.error = NXT_ENOMEM;
+ return NXT_ERROR;
+ }
+ }
+
+ n = c->io->recvbuf(c, b);
+
+ if (n > 0) {
+ c->read = b;
+
+ } else {
+ c->read = NULL;
+ nxt_http_proxy_buf_mem_free(task, r, b);
+ }
+
+ return n;
+}
+
+
+static void
+nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer header read done");
+
+ b = c->read;
+
+ ret = nxt_h1p_peer_header_parse(peer, &b->mem);
+
+ r = peer->request;
+
+ ret = nxt_expect(NXT_DONE, ret);
+
+ if (ret != NXT_AGAIN) {
+ engine = task->thread->engine;
+ nxt_timer_disable(engine, &c->write_timer);
+ nxt_timer_disable(engine, &c->read_timer);
+ }
+
+ switch (ret) {
+
+ case NXT_DONE:
+ peer->fields = peer->proto.h1->parser.fields;
+
+ ret = nxt_http_fields_process(peer->fields,
+ &nxt_h1p_peer_fields_hash, r);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
+ break;
+ }
+
+ c->read = NULL;
+
+ if (nxt_buf_mem_used_size(&b->mem) != 0) {
+ peer->body = b;
+ }
+
+ peer->header_received = 1;
+
+ r->state->ready_handler(task, r, peer);
+ return;
+
+ case NXT_AGAIN:
+ if (nxt_buf_mem_free_size(&b->mem) != 0) {
+ nxt_conn_read(task->thread->engine, c);
+ return;
+ }
+
+ /* Fall through. */
+
+ default:
+ case NXT_ERROR:
+ case NXT_HTTP_PARSE_INVALID:
+ case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
+ case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+ break;
+ }
+
+ nxt_http_proxy_buf_mem_free(task, r, b);
+
+ r->state->error_handler(task, r, peer);
+}
+
+
+static nxt_int_t
+nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
+{
+ u_char *p;
+ size_t length;
+ nxt_int_t status;
+
+ if (peer->status < 0) {
+ length = nxt_buf_mem_used_size(bm);
+
+ if (nxt_slow_path(length < 12)) {
+ return NXT_AGAIN;
+ }
+
+ p = bm->pos;
+
+ if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0
+ || (p[7] != '0' && p[7] != '1')))
+ {
+ return NXT_ERROR;
+ }
+
+ status = nxt_int_parse(&p[9], 3);
+
+ if (nxt_slow_path(status < 0)) {
+ return NXT_ERROR;
+ }
+
+ p += 12;
+ length -= 12;
+
+ p = nxt_memchr(p, '\n', length);
+
+ if (nxt_slow_path(p == NULL)) {
+ return NXT_AGAIN;
+ }
+
+ bm->pos = p + 1;
+ peer->status = status;
+ }
+
+ return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
+}
+
+
+static void
+nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer read");
+
+ c = peer->proto.h1->conn;
+ c->read_state = &nxt_h1p_peer_read_state;
+
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+
+ .timer_handler = nxt_h1p_peer_read_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer read done");
+
+ peer->body = c->read;
+ c->read = NULL;
+
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer closed");
+
+ r = peer->request;
+
+ if (peer->header_received) {
+ peer->body = nxt_http_buf_last(r);
+
+ peer->closed = 1;
+
+ r->state->ready_handler(task, r, peer);
+
+ } else {
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r->state->error_handler(task, r, peer);
+ }
+}
+
+
+static void
+nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer error");
+
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ timer = obj;
+
+ nxt_debug(task, "h1p peer send timeout");
+
+ c = nxt_write_timer_conn(timer);
+ c->block_write = 1;
+ c->block_read = 1;
+
+ peer = c->socket.data;
+ peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ timer = obj;
+
+ nxt_debug(task, "h1p peer read timeout");
+
+ c = nxt_read_timer_conn(timer);
+ c->block_write = 1;
+ c->block_read = 1;
+
+ peer = c->socket.data;
+ peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static nxt_msec_t
+nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
+{
+ nxt_http_peer_t *peer;
+
+ peer = c->socket.data;
+
+ return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
+}
+
+
+static void
+nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer close");
+
+ peer->closed = 1;
+
+ c = peer->proto.h1->conn;
+ task = &c->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+
+ if (c->socket.fd != -1) {
+ c->write_state = &nxt_h1p_peer_close_state;
+
+ nxt_conn_close(task->thread->engine, c);
+
+ } else {
+ nxt_h1p_peer_free(task, c, NULL);
+ }
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_close_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_free,
+};
+
+
+static void
+nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "h1p peer free");
+
+ nxt_conn_free(task, c);
+}