diff options
Diffstat (limited to 'src/nxt_h1proto.c')
-rw-r--r-- | src/nxt_h1proto.c | 431 |
1 files changed, 307 insertions, 124 deletions
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 3a822042..a60bdb36 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -6,6 +6,9 @@ #include <nxt_router.h> #include <nxt_http.h> +#include <nxt_h1proto.h> +#include <nxt_websocket.h> +#include <nxt_websocket_header.h> /* @@ -23,19 +26,24 @@ static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data); -static nxt_int_t nxt_h1p_header_process(nxt_h1proto_t *h1p, +static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_http_request_t *r); static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf); static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data); +static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, + uintptr_t data); static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data); static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); -static void nxt_h1p_request_tls(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, @@ -51,8 +59,6 @@ static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, - uintptr_t data); nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_http_request_t *r); static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto, @@ -72,16 +78,16 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data); - #if (NXT_TLS) static const nxt_conn_state_t nxt_http_idle_state; static const nxt_conn_state_t nxt_h1p_shutdown_state; #endif static const nxt_conn_state_t nxt_h1p_idle_state; -static const nxt_conn_state_t nxt_h1p_idle_close_state; static const nxt_conn_state_t nxt_h1p_header_parse_state; static const nxt_conn_state_t nxt_h1p_read_body_state; static const nxt_conn_state_t nxt_h1p_request_send_state; @@ -90,59 +96,20 @@ static const nxt_conn_state_t nxt_h1p_keepalive_state; static const nxt_conn_state_t nxt_h1p_close_state; -const nxt_http_proto_body_read_t nxt_http_proto_body_read[3] = { - nxt_h1p_request_body_read, - NULL, - NULL, -}; - - -const nxt_http_proto_local_addr_t nxt_http_proto_local_addr[3] = { - nxt_h1p_request_local_addr, - NULL, - NULL, -}; - - -const nxt_http_proto_tls_t nxt_http_proto_tls[3] = { - nxt_h1p_request_tls, - NULL, - NULL, -}; - - -const nxt_http_proto_header_send_t nxt_http_proto_header_send[3] = { - nxt_h1p_request_header_send, - NULL, - NULL, -}; - - -const nxt_http_proto_send_t nxt_http_proto_send[3] = { - nxt_h1p_request_send, - NULL, - NULL, -}; - - -const nxt_http_proto_body_bytes_sent_t nxt_http_proto_body_bytes_sent[3] = { - nxt_h1p_request_body_bytes_sent, - NULL, - NULL, -}; - - -const nxt_http_proto_discard_t nxt_http_proto_discard[3] = { - nxt_h1p_request_discard, - NULL, - NULL, -}; - - -const nxt_http_proto_close_t nxt_http_proto_close[3] = { - nxt_h1p_request_close, - NULL, - NULL, +const nxt_http_proto_table_t nxt_http_proto[3] = { + /* NXT_HTTP_PROTO_H1 */ + { + .body_read = nxt_h1p_request_body_read, + .local_addr = nxt_h1p_request_local_addr, + .header_send = nxt_h1p_request_header_send, + .send = nxt_h1p_request_send, + .body_bytes_sent = nxt_h1p_request_body_bytes_sent, + .discard = nxt_h1p_request_discard, + .close = nxt_h1p_request_close, + .ws_frame_start = nxt_h1p_websocket_frame_start, + }, + /* NXT_HTTP_PROTO_H2 */ + /* NXT_HTTP_PROTO_DEVNULL */ }; @@ -150,6 +117,10 @@ static nxt_lvlhsh_t nxt_h1p_fields_hash; static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Connection"), &nxt_h1p_connection, 0 }, + { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 }, + { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 }, + { nxt_string("Sec-WebSocket-Version"), + &nxt_h1p_websocket_version, 0 }, { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 }, { nxt_string("Host"), &nxt_http_request_host, 0 }, @@ -446,8 +417,13 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) h1p->request = r; r->proto.h1 = h1p; + /* r->protocol = NXT_HTTP_PROTO_H1 is done by zeroing. */ r->remote = c->remote; +#if (NXT_TLS) + r->tls = c->u.tls; +#endif + ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); if (nxt_fast_path(ret == NXT_OK)) { @@ -519,7 +495,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data) */ h1p->keepalive = (h1p->parser.version.s.minor != '0'); - ret = nxt_h1p_header_process(h1p, r); + ret = nxt_h1p_header_process(task, h1p, r); if (nxt_fast_path(ret == NXT_OK)) { @@ -567,7 +543,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data) break; } - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); error: @@ -578,8 +554,12 @@ error: static nxt_int_t -nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r) +nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, + nxt_http_request_t *r) { + u_char *m; + nxt_int_t ret; + r->target.start = h1p->parser.target_start; r->target.length = h1p->parser.target_end - h1p->parser.target_start; @@ -594,7 +574,46 @@ nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r) r->fields = h1p->parser.fields; - return nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r); + ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + if (h1p->connection_upgrade && h1p->upgrade_websocket) { + m = h1p->parser.method.start; + + if (nxt_slow_path(h1p->parser.method.length != 3 + || m[0] != 'G' + || m[1] != 'E' + || m[2] != 'T')) + { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->parser.version.s.minor != '1')) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->websocket_key == NULL)) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->websocket_version_ok == 0)) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version"); + + return NXT_HTTP_UPGRADE_REQUIRED; + } + + r->websocket_handshake = 1; + } + + return ret; } @@ -636,12 +655,68 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_http_request_t *r; + nxt_http_request_t *r; + static const u_char *upgrade = (const u_char *) "upgrade"; r = ctx; if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { r->proto.h1->keepalive = 0; + + } else if (field->value_length == 7 + && nxt_memcasecmp(field->value, upgrade, 7) == 0) + { + r->proto.h1->connection_upgrade = 1; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + static const u_char *websocket = (const u_char *) "websocket"; + + r = ctx; + + if (field->value_length == 9 + && nxt_memcasecmp(field->value, websocket, 9) == 0) + { + r->proto.h1->upgrade_websocket = 1; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 24) { + r->proto.h1->websocket_key = field; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 2 + && field->value[0] == '1' && field->value[1] == '3') + { + r->proto.h1->websocket_version_ok = 1; } return NXT_OK; @@ -743,6 +818,7 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) if (size != 0) { in->next = h1p->buffers; h1p->buffers = in; + h1p->nbuffers++; c = h1p->conn; c->read = b; @@ -821,13 +897,13 @@ nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r) } -static void -nxt_h1p_request_tls(nxt_task_t *task, nxt_http_request_t *r) -{ -#if (NXT_TLS) - r->tls = r->proto.h1->conn->u.tls; -#endif -} +#define NXT_HTTP_LAST_INFORMATIONAL \ + (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1) + +static const nxt_str_t nxt_http_informational[] = { + nxt_string("HTTP/1.1 100 Continue\r\n"), + nxt_string("HTTP/1.1 101 Switching Protocols\r\n"), +}; #define NXT_HTTP_LAST_SUCCESS \ @@ -886,7 +962,7 @@ static const nxt_str_t nxt_http_client_error[] = { nxt_string("HTTP/1.1 423\r\n"), nxt_string("HTTP/1.1 424\r\n"), nxt_string("HTTP/1.1 425\r\n"), - nxt_string("HTTP/1.1 426\r\n"), + nxt_string("HTTP/1.1 426 Upgrade Required\r\n"), nxt_string("HTTP/1.1 427\r\n"), nxt_string("HTTP/1.1 428\r\n"), nxt_string("HTTP/1.1 429\r\n"), @@ -933,14 +1009,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) nxt_h1proto_t *h1p; const nxt_str_t *status; nxt_http_field_t *field; - nxt_event_engine_t *engine; u_char buf[UNKNOWN_STATUS_LENGTH]; static const char chunked[] = "Transfer-Encoding: chunked\r\n"; + static const char websocket_version[] = "Sec-WebSocket-Version: 13\r\n"; - static const nxt_str_t connection[2] = { + static const nxt_str_t connection[3] = { nxt_string("Connection: close\r\n"), nxt_string("Connection: keep-alive\r\n"), + nxt_string("Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: "), }; nxt_debug(task, "h1p request header send"); @@ -949,7 +1028,10 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) h1p = r->proto.h1; n = r->status; - if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) { + if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) { + status = &nxt_http_informational[n - NXT_HTTP_CONTINUE]; + + } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) { status = &nxt_http_success[n - NXT_HTTP_OK]; } else if (n >= NXT_HTTP_MULTIPLE_CHOICES @@ -981,27 +1063,41 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) /* Trailing CRLF at the end of header. */ size += nxt_length("\r\n"); - http11 = (h1p->parser.version.s.minor != '0'); + conn = -1; + + if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) { + h1p->websocket = 1; + h1p->keepalive = 0; + conn = 2; + size += NXT_WEBSOCKET_ACCEPT_SIZE + 2; + + } else { + http11 = (h1p->parser.version.s.minor != '0'); + + if (r->resp.content_length == NULL || r->resp.content_length->skip) { - if (r->resp.content_length == NULL || r->resp.content_length->skip) { + if (http11) { + if (n != NXT_HTTP_NOT_MODIFIED + && n != NXT_HTTP_NO_CONTENT + && !h1p->websocket) + { + h1p->chunked = 1; + size += nxt_length(chunked); + /* Trailing CRLF will be added by the first chunk header. */ + size -= nxt_length("\r\n"); + } - if (http11) { - if (n != NXT_HTTP_NOT_MODIFIED && n != NXT_HTTP_NO_CONTENT) { - h1p->chunked = 1; - size += nxt_length(chunked); - /* Trailing CRLF will be added by the first chunk header. */ - size -= nxt_length("\r\n"); + } else { + h1p->keepalive = 0; } + } - } else { - h1p->keepalive = 0; + if (http11 ^ h1p->keepalive) { + conn = h1p->keepalive; } } - conn = -1; - - if (http11 ^ h1p->keepalive) { - conn = h1p->keepalive; + if (conn >= 0) { size += connection[conn].length; } @@ -1014,15 +1110,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) } nxt_list_loop; + if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) { + size += nxt_length(websocket_version); + } + header = nxt_http_buf_mem(task, r, size); if (nxt_slow_path(header == NULL)) { nxt_h1p_request_error(task, h1p, r); return; } - p = header->mem.free; - - p = nxt_cpymem(p, status->start, status->length); + p = nxt_cpymem(header->mem.free, status->start, status->length); nxt_list_each(field, r->resp.fields) { @@ -1039,6 +1137,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) p = nxt_cpymem(p, connection[conn].start, connection[conn].length); } + if (h1p->websocket) { + nxt_websocket_accept(p, h1p->websocket_key->value); + p += NXT_WEBSOCKET_ACCEPT_SIZE; + + *p++ = '\r'; *p++ = '\n'; + } + + if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) { + p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version)); + } + if (h1p->chunked) { p = nxt_cpymem(p, chunked, nxt_length(chunked)); /* Trailing CRLF will be added by the first chunk header. */ @@ -1056,12 +1165,59 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) c->write = header; c->write_state = &nxt_h1p_request_send_state; - engine = task->thread->engine; + nxt_conn_write(task->thread->engine, c); - nxt_work_queue_add(&engine->fast_work_queue, r->state->ready_handler, - task, r, NULL); + if (h1p->websocket) { + nxt_h1p_websocket_first_frame_start(task, r, c->read); + } +} + + +void +nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) +{ + size_t size; + nxt_buf_t *b, *in, *next; + nxt_conn_t *c; + + nxt_debug(task, "h1p complete buffers"); + + b = h1p->buffers; + c = h1p->conn; + in = c->read; - nxt_conn_write(engine, c); + if (b != NULL) { + if (in == NULL) { + /* A request with large body. */ + in = b; + c->read = in; + + b = in->next; + in->next = NULL; + } + + while (b != NULL) { + next = b->next; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + + b = next; + } + + h1p->buffers = NULL; + h1p->nbuffers = 0; + } + + if (in != NULL) { + size = nxt_buf_mem_used_size(&in->mem); + + if (size == 0) { + nxt_mp_free(c->mem_pool, in); + + c->read = NULL; + } + } } @@ -1212,8 +1368,13 @@ nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data) r = h1p->request; + if (nxt_slow_path(r == NULL)) { + nxt_h1p_shutdown(task, h1p->conn); + return; + } + if (r->fields == NULL) { - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); } if (r->status == 0) { @@ -1249,7 +1410,7 @@ nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data) r = h1p->request; if (r->fields == NULL) { - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); } nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT); @@ -1275,7 +1436,7 @@ nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data) } -static nxt_msec_t +nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data) { nxt_h1proto_t *h1p; @@ -1382,7 +1543,7 @@ static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c) { size_t size; - nxt_buf_t *in, *b, *next; + nxt_buf_t *in; nxt_debug(task, "h1p keepalive"); @@ -1390,40 +1551,22 @@ nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c) nxt_conn_tcp_nodelay_on(task, c); } - b = h1p->buffers; + nxt_h1p_complete_buffers(task, h1p); + + in = c->read; nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn)); c->sent = 0; - in = c->read; - if (in == NULL) { - /* A request with large body. */ - in = b; - c->read = in; - - b = in->next; - in->next = NULL; - } - - while (b != NULL) { - next = b->next; - nxt_mp_free(c->mem_pool, b); - b = next; - } - - size = nxt_buf_mem_used_size(&in->mem); - - if (size == 0) { - nxt_mp_free(c->mem_pool, in); - - c->read = NULL; c->read_state = &nxt_h1p_keepalive_state; nxt_conn_read(task->thread->engine, c); } else { + size = nxt_buf_mem_used_size(&in->mem); + nxt_debug(task, "h1p pipelining"); nxt_memmove(in->mem.start, in->mem.pos, size); @@ -1452,7 +1595,7 @@ static const nxt_conn_state_t nxt_h1p_keepalive_state }; -static const nxt_conn_state_t nxt_h1p_idle_close_state +const nxt_conn_state_t nxt_h1p_idle_close_state nxt_aligned(64) = { .close_handler = nxt_h1p_idle_close, @@ -1595,8 +1738,33 @@ nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data) static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c) { + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_debug(task, "h1p shutdown"); + h1p = c->socket.data; + + if (nxt_slow_path(h1p != NULL && h1p->websocket_timer != NULL)) { + timer = &h1p->websocket_timer->timer; + + if (timer->handler != nxt_h1p_conn_ws_shutdown) { + timer->handler = nxt_h1p_conn_ws_shutdown; + nxt_timer_add(task->thread->engine, timer, 0); + + } else { + nxt_debug(task, "h1p already scheduled ws shutdown"); + } + + } else { + nxt_h1p_shutdown_(task, c); + } +} + + +static void +nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c) +{ c->socket.data = NULL; #if (NXT_TLS) @@ -1628,6 +1796,21 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state static void +nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws shutdown"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + + nxt_h1p_shutdown_(task, ws_timer->h1p->conn); +} + + +static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; |