diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_h1proto.c | 370 |
1 files changed, 292 insertions, 78 deletions
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index a92dfe3b..a60bdb36 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -6,26 +6,9 @@ #include <nxt_router.h> #include <nxt_http.h> - - -struct nxt_h1proto_s { - nxt_http_request_parse_t parser; - - uint8_t nbuffers; - uint8_t keepalive; /* 1 bit */ - uint8_t chunked; /* 1 bit */ - nxt_http_te_t transfer_encoding:8; /* 2 bits */ - - uint32_t header_size; - - nxt_http_request_t *request; - nxt_buf_t *buffers; - /* - * All fields before the conn field will - * be zeroed in a keep-alive connection. - */ - nxt_conn_t *conn; -}; +#include <nxt_h1proto.h> +#include <nxt_websocket.h> +#include <nxt_websocket_header.h> /* @@ -43,12 +26,18 @@ static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data); -static nxt_int_t nxt_h1p_header_process(nxt_h1proto_t *h1p, +static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_http_request_t *r); static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf); static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data); +static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, + uintptr_t data); static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data); static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r); @@ -70,8 +59,6 @@ static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, - uintptr_t data); nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_http_request_t *r); static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto, @@ -91,16 +78,16 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data); - #if (NXT_TLS) static const nxt_conn_state_t nxt_http_idle_state; static const nxt_conn_state_t nxt_h1p_shutdown_state; #endif static const nxt_conn_state_t nxt_h1p_idle_state; -static const nxt_conn_state_t nxt_h1p_idle_close_state; static const nxt_conn_state_t nxt_h1p_header_parse_state; static const nxt_conn_state_t nxt_h1p_read_body_state; static const nxt_conn_state_t nxt_h1p_request_send_state; @@ -119,6 +106,7 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { .body_bytes_sent = nxt_h1p_request_body_bytes_sent, .discard = nxt_h1p_request_discard, .close = nxt_h1p_request_close, + .ws_frame_start = nxt_h1p_websocket_frame_start, }, /* NXT_HTTP_PROTO_H2 */ /* NXT_HTTP_PROTO_DEVNULL */ @@ -129,6 +117,10 @@ static nxt_lvlhsh_t nxt_h1p_fields_hash; static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Connection"), &nxt_h1p_connection, 0 }, + { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 }, + { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 }, + { nxt_string("Sec-WebSocket-Version"), + &nxt_h1p_websocket_version, 0 }, { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 }, { nxt_string("Host"), &nxt_http_request_host, 0 }, @@ -503,7 +495,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data) */ h1p->keepalive = (h1p->parser.version.s.minor != '0'); - ret = nxt_h1p_header_process(h1p, r); + ret = nxt_h1p_header_process(task, h1p, r); if (nxt_fast_path(ret == NXT_OK)) { @@ -551,7 +543,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data) break; } - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); error: @@ -562,8 +554,12 @@ error: static nxt_int_t -nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r) +nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, + nxt_http_request_t *r) { + u_char *m; + nxt_int_t ret; + r->target.start = h1p->parser.target_start; r->target.length = h1p->parser.target_end - h1p->parser.target_start; @@ -578,7 +574,46 @@ nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r) r->fields = h1p->parser.fields; - return nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r); + ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + if (h1p->connection_upgrade && h1p->upgrade_websocket) { + m = h1p->parser.method.start; + + if (nxt_slow_path(h1p->parser.method.length != 3 + || m[0] != 'G' + || m[1] != 'E' + || m[2] != 'T')) + { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->parser.version.s.minor != '1')) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->websocket_key == NULL)) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->websocket_version_ok == 0)) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version"); + + return NXT_HTTP_UPGRADE_REQUIRED; + } + + r->websocket_handshake = 1; + } + + return ret; } @@ -620,12 +655,68 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_http_request_t *r; + nxt_http_request_t *r; + static const u_char *upgrade = (const u_char *) "upgrade"; r = ctx; if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { r->proto.h1->keepalive = 0; + + } else if (field->value_length == 7 + && nxt_memcasecmp(field->value, upgrade, 7) == 0) + { + r->proto.h1->connection_upgrade = 1; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + static const u_char *websocket = (const u_char *) "websocket"; + + r = ctx; + + if (field->value_length == 9 + && nxt_memcasecmp(field->value, websocket, 9) == 0) + { + r->proto.h1->upgrade_websocket = 1; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 24) { + r->proto.h1->websocket_key = field; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 2 + && field->value[0] == '1' && field->value[1] == '3') + { + r->proto.h1->websocket_version_ok = 1; } return NXT_OK; @@ -727,6 +818,7 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) if (size != 0) { in->next = h1p->buffers; h1p->buffers = in; + h1p->nbuffers++; c = h1p->conn; c->read = b; @@ -805,6 +897,15 @@ nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r) } +#define NXT_HTTP_LAST_INFORMATIONAL \ + (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1) + +static const nxt_str_t nxt_http_informational[] = { + nxt_string("HTTP/1.1 100 Continue\r\n"), + nxt_string("HTTP/1.1 101 Switching Protocols\r\n"), +}; + + #define NXT_HTTP_LAST_SUCCESS \ (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1) @@ -861,7 +962,7 @@ static const nxt_str_t nxt_http_client_error[] = { nxt_string("HTTP/1.1 423\r\n"), nxt_string("HTTP/1.1 424\r\n"), nxt_string("HTTP/1.1 425\r\n"), - nxt_string("HTTP/1.1 426\r\n"), + nxt_string("HTTP/1.1 426 Upgrade Required\r\n"), nxt_string("HTTP/1.1 427\r\n"), nxt_string("HTTP/1.1 428\r\n"), nxt_string("HTTP/1.1 429\r\n"), @@ -911,10 +1012,14 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) u_char buf[UNKNOWN_STATUS_LENGTH]; static const char chunked[] = "Transfer-Encoding: chunked\r\n"; + static const char websocket_version[] = "Sec-WebSocket-Version: 13\r\n"; - static const nxt_str_t connection[2] = { + static const nxt_str_t connection[3] = { nxt_string("Connection: close\r\n"), nxt_string("Connection: keep-alive\r\n"), + nxt_string("Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: "), }; nxt_debug(task, "h1p request header send"); @@ -923,7 +1028,10 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) h1p = r->proto.h1; n = r->status; - if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) { + if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) { + status = &nxt_http_informational[n - NXT_HTTP_CONTINUE]; + + } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) { status = &nxt_http_success[n - NXT_HTTP_OK]; } else if (n >= NXT_HTTP_MULTIPLE_CHOICES @@ -955,27 +1063,41 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) /* Trailing CRLF at the end of header. */ size += nxt_length("\r\n"); - http11 = (h1p->parser.version.s.minor != '0'); + conn = -1; + + if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) { + h1p->websocket = 1; + h1p->keepalive = 0; + conn = 2; + size += NXT_WEBSOCKET_ACCEPT_SIZE + 2; + + } else { + http11 = (h1p->parser.version.s.minor != '0'); + + if (r->resp.content_length == NULL || r->resp.content_length->skip) { - if (r->resp.content_length == NULL || r->resp.content_length->skip) { + if (http11) { + if (n != NXT_HTTP_NOT_MODIFIED + && n != NXT_HTTP_NO_CONTENT + && !h1p->websocket) + { + h1p->chunked = 1; + size += nxt_length(chunked); + /* Trailing CRLF will be added by the first chunk header. */ + size -= nxt_length("\r\n"); + } - if (http11) { - if (n != NXT_HTTP_NOT_MODIFIED && n != NXT_HTTP_NO_CONTENT) { - h1p->chunked = 1; - size += nxt_length(chunked); - /* Trailing CRLF will be added by the first chunk header. */ - size -= nxt_length("\r\n"); + } else { + h1p->keepalive = 0; } + } - } else { - h1p->keepalive = 0; + if (http11 ^ h1p->keepalive) { + conn = h1p->keepalive; } } - conn = -1; - - if (http11 ^ h1p->keepalive) { - conn = h1p->keepalive; + if (conn >= 0) { size += connection[conn].length; } @@ -988,15 +1110,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) } nxt_list_loop; + if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) { + size += nxt_length(websocket_version); + } + header = nxt_http_buf_mem(task, r, size); if (nxt_slow_path(header == NULL)) { nxt_h1p_request_error(task, h1p, r); return; } - p = header->mem.free; - - p = nxt_cpymem(p, status->start, status->length); + p = nxt_cpymem(header->mem.free, status->start, status->length); nxt_list_each(field, r->resp.fields) { @@ -1013,6 +1137,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) p = nxt_cpymem(p, connection[conn].start, connection[conn].length); } + if (h1p->websocket) { + nxt_websocket_accept(p, h1p->websocket_key->value); + p += NXT_WEBSOCKET_ACCEPT_SIZE; + + *p++ = '\r'; *p++ = '\n'; + } + + if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) { + p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version)); + } + if (h1p->chunked) { p = nxt_cpymem(p, chunked, nxt_length(chunked)); /* Trailing CRLF will be added by the first chunk header. */ @@ -1031,6 +1166,58 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) c->write_state = &nxt_h1p_request_send_state; nxt_conn_write(task->thread->engine, c); + + if (h1p->websocket) { + nxt_h1p_websocket_first_frame_start(task, r, c->read); + } +} + + +void +nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) +{ + size_t size; + nxt_buf_t *b, *in, *next; + nxt_conn_t *c; + + nxt_debug(task, "h1p complete buffers"); + + b = h1p->buffers; + c = h1p->conn; + in = c->read; + + if (b != NULL) { + if (in == NULL) { + /* A request with large body. */ + in = b; + c->read = in; + + b = in->next; + in->next = NULL; + } + + while (b != NULL) { + next = b->next; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + + b = next; + } + + h1p->buffers = NULL; + h1p->nbuffers = 0; + } + + if (in != NULL) { + size = nxt_buf_mem_used_size(&in->mem); + + if (size == 0) { + nxt_mp_free(c->mem_pool, in); + + c->read = NULL; + } + } } @@ -1181,8 +1368,13 @@ nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data) r = h1p->request; + if (nxt_slow_path(r == NULL)) { + nxt_h1p_shutdown(task, h1p->conn); + return; + } + if (r->fields == NULL) { - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); } if (r->status == 0) { @@ -1218,7 +1410,7 @@ nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data) r = h1p->request; if (r->fields == NULL) { - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); } nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT); @@ -1244,7 +1436,7 @@ nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data) } -static nxt_msec_t +nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data) { nxt_h1proto_t *h1p; @@ -1351,7 +1543,7 @@ static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c) { size_t size; - nxt_buf_t *in, *b, *next; + nxt_buf_t *in; nxt_debug(task, "h1p keepalive"); @@ -1359,40 +1551,22 @@ nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c) nxt_conn_tcp_nodelay_on(task, c); } - b = h1p->buffers; + nxt_h1p_complete_buffers(task, h1p); + + in = c->read; nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn)); c->sent = 0; - in = c->read; - if (in == NULL) { - /* A request with large body. */ - in = b; - c->read = in; - - b = in->next; - in->next = NULL; - } - - while (b != NULL) { - next = b->next; - nxt_mp_free(c->mem_pool, b); - b = next; - } - - size = nxt_buf_mem_used_size(&in->mem); - - if (size == 0) { - nxt_mp_free(c->mem_pool, in); - - c->read = NULL; c->read_state = &nxt_h1p_keepalive_state; nxt_conn_read(task->thread->engine, c); } else { + size = nxt_buf_mem_used_size(&in->mem); + nxt_debug(task, "h1p pipelining"); nxt_memmove(in->mem.start, in->mem.pos, size); @@ -1421,7 +1595,7 @@ static const nxt_conn_state_t nxt_h1p_keepalive_state }; -static const nxt_conn_state_t nxt_h1p_idle_close_state +const nxt_conn_state_t nxt_h1p_idle_close_state nxt_aligned(64) = { .close_handler = nxt_h1p_idle_close, @@ -1564,8 +1738,33 @@ nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data) static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c) { + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_debug(task, "h1p shutdown"); + h1p = c->socket.data; + + if (nxt_slow_path(h1p != NULL && h1p->websocket_timer != NULL)) { + timer = &h1p->websocket_timer->timer; + + if (timer->handler != nxt_h1p_conn_ws_shutdown) { + timer->handler = nxt_h1p_conn_ws_shutdown; + nxt_timer_add(task->thread->engine, timer, 0); + + } else { + nxt_debug(task, "h1p already scheduled ws shutdown"); + } + + } else { + nxt_h1p_shutdown_(task, c); + } +} + + +static void +nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c) +{ c->socket.data = NULL; #if (NXT_TLS) @@ -1597,6 +1796,21 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state static void +nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws shutdown"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + + nxt_h1p_shutdown_(task, ws_timer->h1p->conn); +} + + +static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; |