diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:31:53 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:31:53 +0300 |
commit | e501c74ddceab86e48c031ca9b5e154f52dcdae0 (patch) | |
tree | 7bfe94354df516d1ceefc5af3194ba943e443aa2 /src | |
parent | 9bbf54e23e185e94054072fff2673f6f5cd203e9 (diff) | |
download | unit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.gz unit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.bz2 |
Introducing websocket support in router and libunit.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_conf_validation.c | 25 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 370 | ||||
-rw-r--r-- | src/nxt_h1proto.h | 48 | ||||
-rw-r--r-- | src/nxt_h1proto_websocket.c | 719 | ||||
-rw-r--r-- | src/nxt_http.h | 28 | ||||
-rw-r--r-- | src/nxt_http_request.c | 21 | ||||
-rw-r--r-- | src/nxt_http_response.c | 2 | ||||
-rw-r--r-- | src/nxt_http_websocket.c | 161 | ||||
-rw-r--r-- | src/nxt_port.c | 1 | ||||
-rw-r--r-- | src/nxt_port.h | 15 | ||||
-rw-r--r-- | src/nxt_router.c | 181 | ||||
-rw-r--r-- | src/nxt_router.h | 9 | ||||
-rw-r--r-- | src/nxt_router_request.h | 71 | ||||
-rw-r--r-- | src/nxt_sha1.c | 295 | ||||
-rw-r--r-- | src/nxt_sha1.h | 24 | ||||
-rw-r--r-- | src/nxt_unit.c | 1159 | ||||
-rw-r--r-- | src/nxt_unit.h | 32 | ||||
-rw-r--r-- | src/nxt_unit_request.h | 1 | ||||
-rw-r--r-- | src/nxt_unit_typedefs.h | 27 | ||||
-rw-r--r-- | src/nxt_unit_websocket.h | 27 | ||||
-rw-r--r-- | src/nxt_websocket.c | 122 | ||||
-rw-r--r-- | src/nxt_websocket.h | 21 | ||||
-rw-r--r-- | src/nxt_websocket_accept.c | 68 | ||||
-rw-r--r-- | src/nxt_websocket_header.h | 68 | ||||
-rw-r--r-- | src/test/nxt_unit_websocket_chat.c | 348 | ||||
-rw-r--r-- | src/test/nxt_unit_websocket_echo.c | 105 |
26 files changed, 3508 insertions, 440 deletions
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 45c0eb41..ca8ec62e 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -102,6 +102,26 @@ static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); +static nxt_conf_vldt_object_t nxt_conf_vldt_websocket_members[] = { + { nxt_string("read_timeout"), + NXT_CONF_VLDT_INTEGER, + NULL, + NULL }, + + { nxt_string("keepalive_interval"), + NXT_CONF_VLDT_INTEGER, + NULL, + NULL }, + + { nxt_string("max_frame_size"), + NXT_CONF_VLDT_INTEGER, + NULL, + NULL }, + + NXT_CONF_VLDT_END +}; + + static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[] = { { nxt_string("header_read_timeout"), NXT_CONF_VLDT_INTEGER, @@ -128,6 +148,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[] = { NULL, NULL }, + { nxt_string("websocket"), + NXT_CONF_VLDT_OBJECT, + &nxt_conf_vldt_object, + (void *) &nxt_conf_vldt_websocket_members }, + NXT_CONF_VLDT_END }; diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index a92dfe3b..a60bdb36 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -6,26 +6,9 @@ #include <nxt_router.h> #include <nxt_http.h> - - -struct nxt_h1proto_s { - nxt_http_request_parse_t parser; - - uint8_t nbuffers; - uint8_t keepalive; /* 1 bit */ - uint8_t chunked; /* 1 bit */ - nxt_http_te_t transfer_encoding:8; /* 2 bits */ - - uint32_t header_size; - - nxt_http_request_t *request; - nxt_buf_t *buffers; - /* - * All fields before the conn field will - * be zeroed in a keep-alive connection. - */ - nxt_conn_t *conn; -}; +#include <nxt_h1proto.h> +#include <nxt_websocket.h> +#include <nxt_websocket_header.h> /* @@ -43,12 +26,18 @@ static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data); -static nxt_int_t nxt_h1p_header_process(nxt_h1proto_t *h1p, +static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_http_request_t *r); static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf); static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data); +static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, + uintptr_t data); +static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, + uintptr_t data); static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data); static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r); @@ -70,8 +59,6 @@ static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, - uintptr_t data); nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_http_request_t *r); static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto, @@ -91,16 +78,16 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data); - #if (NXT_TLS) static const nxt_conn_state_t nxt_http_idle_state; static const nxt_conn_state_t nxt_h1p_shutdown_state; #endif static const nxt_conn_state_t nxt_h1p_idle_state; -static const nxt_conn_state_t nxt_h1p_idle_close_state; static const nxt_conn_state_t nxt_h1p_header_parse_state; static const nxt_conn_state_t nxt_h1p_read_body_state; static const nxt_conn_state_t nxt_h1p_request_send_state; @@ -119,6 +106,7 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { .body_bytes_sent = nxt_h1p_request_body_bytes_sent, .discard = nxt_h1p_request_discard, .close = nxt_h1p_request_close, + .ws_frame_start = nxt_h1p_websocket_frame_start, }, /* NXT_HTTP_PROTO_H2 */ /* NXT_HTTP_PROTO_DEVNULL */ @@ -129,6 +117,10 @@ static nxt_lvlhsh_t nxt_h1p_fields_hash; static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Connection"), &nxt_h1p_connection, 0 }, + { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 }, + { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 }, + { nxt_string("Sec-WebSocket-Version"), + &nxt_h1p_websocket_version, 0 }, { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 }, { nxt_string("Host"), &nxt_http_request_host, 0 }, @@ -503,7 +495,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data) */ h1p->keepalive = (h1p->parser.version.s.minor != '0'); - ret = nxt_h1p_header_process(h1p, r); + ret = nxt_h1p_header_process(task, h1p, r); if (nxt_fast_path(ret == NXT_OK)) { @@ -551,7 +543,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data) break; } - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); error: @@ -562,8 +554,12 @@ error: static nxt_int_t -nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r) +nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, + nxt_http_request_t *r) { + u_char *m; + nxt_int_t ret; + r->target.start = h1p->parser.target_start; r->target.length = h1p->parser.target_end - h1p->parser.target_start; @@ -578,7 +574,46 @@ nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r) r->fields = h1p->parser.fields; - return nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r); + ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + if (h1p->connection_upgrade && h1p->upgrade_websocket) { + m = h1p->parser.method.start; + + if (nxt_slow_path(h1p->parser.method.length != 3 + || m[0] != 'G' + || m[1] != 'E' + || m[2] != 'T')) + { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->parser.version.s.minor != '1')) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->websocket_key == NULL)) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key"); + + return NXT_HTTP_BAD_REQUEST; + } + + if (nxt_slow_path(h1p->websocket_version_ok == 0)) { + nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version"); + + return NXT_HTTP_UPGRADE_REQUIRED; + } + + r->websocket_handshake = 1; + } + + return ret; } @@ -620,12 +655,68 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_http_request_t *r; + nxt_http_request_t *r; + static const u_char *upgrade = (const u_char *) "upgrade"; r = ctx; if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { r->proto.h1->keepalive = 0; + + } else if (field->value_length == 7 + && nxt_memcasecmp(field->value, upgrade, 7) == 0) + { + r->proto.h1->connection_upgrade = 1; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + static const u_char *websocket = (const u_char *) "websocket"; + + r = ctx; + + if (field->value_length == 9 + && nxt_memcasecmp(field->value, websocket, 9) == 0) + { + r->proto.h1->upgrade_websocket = 1; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 24) { + r->proto.h1->websocket_key = field; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + if (field->value_length == 2 + && field->value[0] == '1' && field->value[1] == '3') + { + r->proto.h1->websocket_version_ok = 1; } return NXT_OK; @@ -727,6 +818,7 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) if (size != 0) { in->next = h1p->buffers; h1p->buffers = in; + h1p->nbuffers++; c = h1p->conn; c->read = b; @@ -805,6 +897,15 @@ nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r) } +#define NXT_HTTP_LAST_INFORMATIONAL \ + (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1) + +static const nxt_str_t nxt_http_informational[] = { + nxt_string("HTTP/1.1 100 Continue\r\n"), + nxt_string("HTTP/1.1 101 Switching Protocols\r\n"), +}; + + #define NXT_HTTP_LAST_SUCCESS \ (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1) @@ -861,7 +962,7 @@ static const nxt_str_t nxt_http_client_error[] = { nxt_string("HTTP/1.1 423\r\n"), nxt_string("HTTP/1.1 424\r\n"), nxt_string("HTTP/1.1 425\r\n"), - nxt_string("HTTP/1.1 426\r\n"), + nxt_string("HTTP/1.1 426 Upgrade Required\r\n"), nxt_string("HTTP/1.1 427\r\n"), nxt_string("HTTP/1.1 428\r\n"), nxt_string("HTTP/1.1 429\r\n"), @@ -911,10 +1012,14 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) u_char buf[UNKNOWN_STATUS_LENGTH]; static const char chunked[] = "Transfer-Encoding: chunked\r\n"; + static const char websocket_version[] = "Sec-WebSocket-Version: 13\r\n"; - static const nxt_str_t connection[2] = { + static const nxt_str_t connection[3] = { nxt_string("Connection: close\r\n"), nxt_string("Connection: keep-alive\r\n"), + nxt_string("Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: "), }; nxt_debug(task, "h1p request header send"); @@ -923,7 +1028,10 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) h1p = r->proto.h1; n = r->status; - if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) { + if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) { + status = &nxt_http_informational[n - NXT_HTTP_CONTINUE]; + + } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) { status = &nxt_http_success[n - NXT_HTTP_OK]; } else if (n >= NXT_HTTP_MULTIPLE_CHOICES @@ -955,27 +1063,41 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) /* Trailing CRLF at the end of header. */ size += nxt_length("\r\n"); - http11 = (h1p->parser.version.s.minor != '0'); + conn = -1; + + if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) { + h1p->websocket = 1; + h1p->keepalive = 0; + conn = 2; + size += NXT_WEBSOCKET_ACCEPT_SIZE + 2; + + } else { + http11 = (h1p->parser.version.s.minor != '0'); + + if (r->resp.content_length == NULL || r->resp.content_length->skip) { - if (r->resp.content_length == NULL || r->resp.content_length->skip) { + if (http11) { + if (n != NXT_HTTP_NOT_MODIFIED + && n != NXT_HTTP_NO_CONTENT + && !h1p->websocket) + { + h1p->chunked = 1; + size += nxt_length(chunked); + /* Trailing CRLF will be added by the first chunk header. */ + size -= nxt_length("\r\n"); + } - if (http11) { - if (n != NXT_HTTP_NOT_MODIFIED && n != NXT_HTTP_NO_CONTENT) { - h1p->chunked = 1; - size += nxt_length(chunked); - /* Trailing CRLF will be added by the first chunk header. */ - size -= nxt_length("\r\n"); + } else { + h1p->keepalive = 0; } + } - } else { - h1p->keepalive = 0; + if (http11 ^ h1p->keepalive) { + conn = h1p->keepalive; } } - conn = -1; - - if (http11 ^ h1p->keepalive) { - conn = h1p->keepalive; + if (conn >= 0) { size += connection[conn].length; } @@ -988,15 +1110,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) } nxt_list_loop; + if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) { + size += nxt_length(websocket_version); + } + header = nxt_http_buf_mem(task, r, size); if (nxt_slow_path(header == NULL)) { nxt_h1p_request_error(task, h1p, r); return; } - p = header->mem.free; - - p = nxt_cpymem(p, status->start, status->length); + p = nxt_cpymem(header->mem.free, status->start, status->length); nxt_list_each(field, r->resp.fields) { @@ -1013,6 +1137,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) p = nxt_cpymem(p, connection[conn].start, connection[conn].length); } + if (h1p->websocket) { + nxt_websocket_accept(p, h1p->websocket_key->value); + p += NXT_WEBSOCKET_ACCEPT_SIZE; + + *p++ = '\r'; *p++ = '\n'; + } + + if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) { + p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version)); + } + if (h1p->chunked) { p = nxt_cpymem(p, chunked, nxt_length(chunked)); /* Trailing CRLF will be added by the first chunk header. */ @@ -1031,6 +1166,58 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) c->write_state = &nxt_h1p_request_send_state; nxt_conn_write(task->thread->engine, c); + + if (h1p->websocket) { + nxt_h1p_websocket_first_frame_start(task, r, c->read); + } +} + + +void +nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) +{ + size_t size; + nxt_buf_t *b, *in, *next; + nxt_conn_t *c; + + nxt_debug(task, "h1p complete buffers"); + + b = h1p->buffers; + c = h1p->conn; + in = c->read; + + if (b != NULL) { + if (in == NULL) { + /* A request with large body. */ + in = b; + c->read = in; + + b = in->next; + in->next = NULL; + } + + while (b != NULL) { + next = b->next; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + + b = next; + } + + h1p->buffers = NULL; + h1p->nbuffers = 0; + } + + if (in != NULL) { + size = nxt_buf_mem_used_size(&in->mem); + + if (size == 0) { + nxt_mp_free(c->mem_pool, in); + + c->read = NULL; + } + } } @@ -1181,8 +1368,13 @@ nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data) r = h1p->request; + if (nxt_slow_path(r == NULL)) { + nxt_h1p_shutdown(task, h1p->conn); + return; + } + if (r->fields == NULL) { - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); } if (r->status == 0) { @@ -1218,7 +1410,7 @@ nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data) r = h1p->request; if (r->fields == NULL) { - (void) nxt_h1p_header_process(h1p, r); + (void) nxt_h1p_header_process(task, h1p, r); } nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT); @@ -1244,7 +1436,7 @@ nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data) } -static nxt_msec_t +nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data) { nxt_h1proto_t *h1p; @@ -1351,7 +1543,7 @@ static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c) { size_t size; - nxt_buf_t *in, *b, *next; + nxt_buf_t *in; nxt_debug(task, "h1p keepalive"); @@ -1359,40 +1551,22 @@ nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c) nxt_conn_tcp_nodelay_on(task, c); } - b = h1p->buffers; + nxt_h1p_complete_buffers(task, h1p); + + in = c->read; nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn)); c->sent = 0; - in = c->read; - if (in == NULL) { - /* A request with large body. */ - in = b; - c->read = in; - - b = in->next; - in->next = NULL; - } - - while (b != NULL) { - next = b->next; - nxt_mp_free(c->mem_pool, b); - b = next; - } - - size = nxt_buf_mem_used_size(&in->mem); - - if (size == 0) { - nxt_mp_free(c->mem_pool, in); - - c->read = NULL; c->read_state = &nxt_h1p_keepalive_state; nxt_conn_read(task->thread->engine, c); } else { + size = nxt_buf_mem_used_size(&in->mem); + nxt_debug(task, "h1p pipelining"); nxt_memmove(in->mem.start, in->mem.pos, size); @@ -1421,7 +1595,7 @@ static const nxt_conn_state_t nxt_h1p_keepalive_state }; -static const nxt_conn_state_t nxt_h1p_idle_close_state +const nxt_conn_state_t nxt_h1p_idle_close_state nxt_aligned(64) = { .close_handler = nxt_h1p_idle_close, @@ -1564,8 +1738,33 @@ nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data) static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c) { + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_debug(task, "h1p shutdown"); + h1p = c->socket.data; + + if (nxt_slow_path(h1p != NULL && h1p->websocket_timer != NULL)) { + timer = &h1p->websocket_timer->timer; + + if (timer->handler != nxt_h1p_conn_ws_shutdown) { + timer->handler = nxt_h1p_conn_ws_shutdown; + nxt_timer_add(task->thread->engine, timer, 0); + + } else { + nxt_debug(task, "h1p already scheduled ws shutdown"); + } + + } else { + nxt_h1p_shutdown_(task, c); + } +} + + +static void +nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c) +{ c->socket.data = NULL; #if (NXT_TLS) @@ -1597,6 +1796,21 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state static void +nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws shutdown"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + + nxt_h1p_shutdown_(task, ws_timer->h1p->conn); +} + + +static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h new file mode 100644 index 00000000..c6d3bd53 --- /dev/null +++ b/src/nxt_h1proto.h @@ -0,0 +1,48 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_H1PROTO_H_INCLUDED_ +#define _NXT_H1PROTO_H_INCLUDED_ + + +#include <nxt_main.h> +#include <nxt_http_parse.h> +#include <nxt_http.h> +#include <nxt_router.h> + + +typedef struct nxt_h1p_websocket_timer_s nxt_h1p_websocket_timer_t; + + +struct nxt_h1proto_s { + nxt_http_request_parse_t parser; + + uint8_t nbuffers; + uint8_t keepalive; /* 1 bit */ + uint8_t chunked; /* 1 bit */ + uint8_t websocket; /* 1 bit */ + uint8_t connection_upgrade; /* 1 bit */ + uint8_t upgrade_websocket; /* 1 bit */ + uint8_t websocket_version_ok; /* 1 bit */ + nxt_http_te_t transfer_encoding:8; /* 2 bits */ + + uint8_t websocket_cont_expected; /* 1 bit */ + uint8_t websocket_closed; /* 1 bit */ + + uint32_t header_size; + + nxt_http_field_t *websocket_key; + nxt_h1p_websocket_timer_t *websocket_timer; + + nxt_http_request_t *request; + nxt_buf_t *buffers; + /* + * All fields before the conn field will + * be zeroed in a keep-alive connection. + */ + nxt_conn_t *conn; +}; + +#endif /* _NXT_H1PROTO_H_INCLUDED_ */ diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c new file mode 100644 index 00000000..dd9b6848 --- /dev/null +++ b/src/nxt_h1proto_websocket.c @@ -0,0 +1,719 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_h1proto.h> +#include <nxt_websocket.h> +#include <nxt_websocket_header.h> + +typedef struct { + uint16_t code; + uint8_t args; + nxt_str_t desc; +} nxt_ws_error_t; + +static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, + void *data); +static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, + nxt_h1proto_t *h1p); +static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, + nxt_h1proto_t *h1p); +static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, + nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh); +static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data); +static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c); +static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, + void *data); +static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, + const nxt_ws_error_t *err, ...); +static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data); + +static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state; +static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state; + +static const nxt_ws_error_t nxt_ws_err_out_of_memory = { + NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR, + 0, nxt_string("Out of memory") }; +static const nxt_ws_error_t nxt_ws_err_too_big = { + NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG, + 1, nxt_string("Message too big: %uL bytes") }; +static const nxt_ws_error_t nxt_ws_err_invalid_close_code = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Close code %ud is not valid") }; +static const nxt_ws_error_t nxt_ws_err_going_away = { + NXT_WEBSOCKET_CR_GOING_AWAY, + 0, nxt_string("Remote peer is going away") }; +static const nxt_ws_error_t nxt_ws_err_not_masked = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 0, nxt_string("Not masked client frame") }; +static const nxt_ws_error_t nxt_ws_err_ctrl_fragmented = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 0, nxt_string("Fragmented control frame") }; +static const nxt_ws_error_t nxt_ws_err_ctrl_too_big = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Control frame too big: %uL bytes") }; +static const nxt_ws_error_t nxt_ws_err_invalid_close_len = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 0, nxt_string("Close frame payload length cannot be 1") }; +static const nxt_ws_error_t nxt_ws_err_invalid_opcode = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Unrecognized opcode %ud") }; +static const nxt_ws_error_t nxt_ws_err_cont_expected = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Continuation expected, but %ud opcode received") }; + +void +nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_socket_conf_joint_t *joint; + + nxt_debug(task, "h1p ws first frame start"); + + h1p = r->proto.h1; + c = h1p->conn; + + if (!c->tcp_nodelay) { + nxt_conn_tcp_nodelay_on(task, c); + } + + joint = c->listen->socket.data; + + if (nxt_slow_path(joint != NULL + && joint->socket_conf->websocket_conf.keepalive_interval != 0)) + { + h1p->websocket_timer = nxt_mp_zget(c->mem_pool, + sizeof(nxt_h1p_websocket_timer_t)); + if (nxt_slow_path(h1p->websocket_timer == NULL)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory); + return; + } + + h1p->websocket_timer->keepalive_interval = + joint->socket_conf->websocket_conf.keepalive_interval; + h1p->websocket_timer->h1p = h1p; + + timer = &h1p->websocket_timer->timer; + timer->task = &c->task; + timer->work_queue = &task->thread->engine->fast_work_queue; + timer->log = &c->log; + timer->bias = NXT_TIMER_DEFAULT_BIAS; + timer->handler = nxt_h1p_conn_ws_keepalive; + } + + nxt_h1p_websocket_frame_start(task, r, ws_frame); +} + + +void +nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame) +{ + size_t size; + nxt_buf_t *in; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + + nxt_debug(task, "h1p ws frame start"); + + h1p = r->proto.h1; + + if (nxt_slow_path(h1p->websocket_closed)) { + return; + } + + c = h1p->conn; + c->read = ws_frame; + + nxt_h1p_complete_buffers(task, h1p); + + in = c->read; + c->read_state = &nxt_h1p_read_ws_frame_header_state; + + if (in == NULL) { + nxt_conn_read(task->thread->engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); + + } else { + size = nxt_buf_mem_used_size(&in->mem); + + nxt_debug(task, "h1p read client ws frame"); + + nxt_memmove(in->mem.start, in->mem.pos, size); + + in->mem.pos = in->mem.start; + in->mem.free = in->mem.start + size; + + nxt_h1p_conn_ws_frame_header_read(task, c, h1p); + } +} + + +static void +nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_websocket_header_t *wsh; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws keepalive"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + h1p = ws_timer->h1p; + + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + out = nxt_http_buf_mem(task, r, 2); + if (nxt_slow_path(out == NULL)) { + nxt_http_request_error_handler(task, r, r->proto.any); + return; + } + + out->mem.start[0] = 0; + out->mem.start[1] = 0; + + wsh = (nxt_websocket_header_t *) out->mem.start; + out->mem.free = nxt_websocket_frame_init(wsh, 0); + + wsh->fin = 1; + wsh->opcode = NXT_WEBSOCKET_OP_PING; + + nxt_http_request_send(task, r, out); +} + + +static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_conn_ws_frame_header_read, + .close_handler = nxt_h1p_conn_ws_error, + .error_handler = nxt_h1p_conn_ws_error, + + .io_read_handler = nxt_h1p_ws_io_read_handler, + + .timer_handler = nxt_h1p_conn_ws_timeout, + .timer_value = nxt_h1p_conn_request_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data) +{ + size_t size, hsize, frame_size, max_frame_size; + uint64_t payload_len; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + nxt_websocket_header_t *wsh; + nxt_socket_conf_joint_t *joint; + + c = obj; + h1p = data; + + nxt_h1p_conn_ws_keepalive_disable(task, h1p); + + size = nxt_buf_mem_used_size(&c->read->mem); + + engine = task->thread->engine; + + if (size < 2) { + nxt_debug(task, "h1p conn ws frame header read %z", size); + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); + + return; + } + + wsh = (nxt_websocket_header_t *) c->read->mem.pos; + + hsize = nxt_websocket_frame_header_size(wsh); + + if (size < hsize) { + nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize); + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); + + return; + } + + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + r->ws_frame = c->read; + + joint = c->listen->socket.data; + + if (nxt_slow_path(joint == NULL)) { + /* + * Listening socket had been closed while + * connection was in keep-alive state. + */ + c->read_state = &nxt_h1p_idle_close_state; + return; + } + + if (nxt_slow_path(wsh->mask == 0)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked); + return; + } + + if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) { + if (nxt_slow_path(wsh->fin == 0)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented); + return; + } + + if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING + && wsh->opcode != NXT_WEBSOCKET_OP_PONG + && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE)) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, + wsh->opcode); + return; + } + + if (nxt_slow_path(wsh->payload_len > 125)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big, + nxt_websocket_frame_payload_len(wsh)); + return; + } + + if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE + && wsh->payload_len == 1)) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len); + return; + } + + } else { + if (h1p->websocket_cont_expected) { + if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected, + wsh->opcode); + return; + } + + } else { + if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY + && wsh->opcode != NXT_WEBSOCKET_OP_TEXT)) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, + wsh->opcode); + return; + } + } + + h1p->websocket_cont_expected = !wsh->fin; + } + + max_frame_size = joint->socket_conf->websocket_conf.max_frame_size; + + payload_len = nxt_websocket_frame_payload_len(wsh); + + if (nxt_slow_path(hsize > max_frame_size + || payload_len > (max_frame_size - hsize))) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len); + return; + } + + c->read_state = &nxt_h1p_read_ws_frame_payload_state; + + frame_size = payload_len + hsize; + + nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size); + + if (frame_size <= size) { + nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); + + return; + } + + if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) { + c->read->mem.end = c->read->mem.start + frame_size; + + } else { + nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0); + + c->read->next = b; + c->read = b; + } + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); +} + + +static void +nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p) +{ + nxt_timer_t *timer; + + if (h1p->websocket_timer == NULL) { + return; + } + + timer = &h1p->websocket_timer->timer; + + if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { + nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown"); + return; + } + + nxt_timer_disable(task->thread->engine, timer); +} + + +static void +nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p) +{ + nxt_timer_t *timer; + + if (h1p->websocket_timer == NULL) { + return; + } + + timer = &h1p->websocket_timer->timer; + + if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { + nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown"); + return; + } + + nxt_timer_add(task->thread->engine, timer, + h1p->websocket_timer->keepalive_interval); +} + + +static void +nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, + nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh) +{ + size_t hsize; + uint8_t *p, *mask; + uint16_t code; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + engine = task->thread->engine; + r = h1p->request; + + c->read = NULL; + + if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) { + nxt_work_queue_add(&engine->fast_work_queue, nxt_h1p_conn_ws_pong, + task, r, NULL); + return; + } + + if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) { + if (wsh->payload_len >= 2) { + hsize = nxt_websocket_frame_header_size(wsh); + mask = nxt_pointer_to(wsh, hsize - 4); + p = nxt_pointer_to(wsh, hsize); + + code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]); + + if (nxt_slow_path(code < 1000 || code >= 5000 + || (code > 1003 && code < 1007) + || (code > 1014 && code < 3000))) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code, + code); + return; + } + } + + h1p->websocket_closed = 1; + } + + nxt_work_queue_add(&engine->fast_work_queue, r->state->ready_handler, + task, r, NULL); +} + + +static void +nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + + h1p = data; + + nxt_debug(task, "h1p conn ws error"); + + r = h1p->request; + + h1p->keepalive = 0; + + if (nxt_fast_path(r != NULL)) { + r->state->error_handler(task, r, h1p); + } +} + + +static ssize_t +nxt_h1p_ws_io_read_handler(nxt_conn_t *c) +{ + size_t size; + ssize_t n; + nxt_buf_t *b; + + b = c->read; + + if (b == NULL) { + /* Enough for control frame. */ + size = 10 + 125; + + b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + c->socket.error = NXT_ENOMEM; + return NXT_ERROR; + } + } + + n = c->io->recvbuf(c, b); + + if (n > 0) { + c->read = b; + + } else { + c->read = NULL; + nxt_mp_free(c->mem_pool, b); + } + + return n; +} + + +static void +nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p conn ws timeout"); + + c = nxt_read_timer_conn(timer); + c->block_read = 1; + /* + * Disable SO_LINGER off during socket closing + * to send "408 Request Timeout" error response. + */ + c->socket.timedout = 0; + + h1p = c->socket.data; + h1p->keepalive = 0; + + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away); +} + + +static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_conn_ws_frame_payload_read, + .close_handler = nxt_h1p_conn_ws_error, + .error_handler = nxt_h1p_conn_ws_error, + + .timer_handler = nxt_h1p_conn_ws_timeout, + .timer_value = nxt_h1p_conn_request_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + nxt_websocket_header_t *wsh; + + c = obj; + h1p = data; + + nxt_h1p_conn_ws_keepalive_disable(task, h1p); + + nxt_debug(task, "h1p conn ws frame read"); + + if (nxt_buf_mem_free_size(&c->read->mem) == 0) { + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; + + nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); + + return; + } + + engine = task->thread->engine; + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); +} + + +static void +hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, + const nxt_ws_error_t *err, ...) +{ + u_char *p; + va_list args; + nxt_buf_t *out; + nxt_str_t desc; + nxt_websocket_header_t *wsh; + u_char buf[125]; + + if (nxt_slow_path(err->args)) { + va_start(args, err); + p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start, + args); + va_end(args); + + desc.start = buf; + desc.length = p - buf; + + } else { + desc = err->desc; + } + + nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc); + + out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length); + if (nxt_slow_path(out == NULL)) { + nxt_http_request_error_handler(task, r, r->proto.any); + return; + } + + out->mem.start[0] = 0; + out->mem.start[1] = 0; + + wsh = (nxt_websocket_header_t *) out->mem.start; + p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length); + + wsh->fin = 1; + wsh->opcode = NXT_WEBSOCKET_OP_CLOSE; + + *p++ = (err->code >> 8) & 0xFF; + *p++ = err->code & 0xFF; + + out->mem.free = nxt_cpymem(p, desc.start, desc.length); + out->next = nxt_http_buf_last(r); + + if (out->next != NULL) { + out->next->completion_handler = nxt_h1p_conn_ws_error_sent; + } + + nxt_http_request_send(task, r, out); +} + + +static void +nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = data; + + nxt_debug(task, "h1p conn ws error sent"); + + r->state->error_handler(task, r, r->proto.any); +} + + +static void +nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data) +{ + uint8_t payload_len, i; + nxt_buf_t *b, *out, *next; + nxt_http_request_t *r; + nxt_websocket_header_t *wsh; + uint8_t mask[4]; + + nxt_debug(task, "h1p conn ws pong"); + + r = obj; + b = r->ws_frame; + + wsh = (nxt_websocket_header_t *) b->mem.pos; + payload_len = wsh->payload_len; + + b->mem.pos += 2; + + nxt_memcpy(mask, b->mem.pos, 4); + + b->mem.pos += 4; + + out = nxt_http_buf_mem(task, r, 2 + payload_len); + if (nxt_slow_path(out == NULL)) { + nxt_http_request_error_handler(task, r, r->proto.any); + return; + } + + out->mem.start[0] = 0; + out->mem.start[1] = 0; + + wsh = (nxt_websocket_header_t *) out->mem.start; + out->mem.free = nxt_websocket_frame_init(wsh, payload_len); + + wsh->fin = 1; + wsh->opcode = NXT_WEBSOCKET_OP_PONG; + + for (i = 0; i < payload_len; i++) { + while (nxt_buf_mem_used_size(&b->mem) == 0) { + next = b->next; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + + b = next; + } + + *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4]; + } + + r->ws_frame = b; + + nxt_http_request_send(task, r, out); + + nxt_http_request_ws_frame_start(task, r, r->ws_frame); +} diff --git a/src/nxt_http.h b/src/nxt_http.h index c1a230ec..ac1eedcf 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -11,6 +11,9 @@ typedef enum { NXT_HTTP_INVALID = 0, + NXT_HTTP_CONTINUE = 100, + NXT_HTTP_SWITCHING_PROTOCOLS = 101, + NXT_HTTP_OK = 200, NXT_HTTP_NO_CONTENT = 204, @@ -26,6 +29,7 @@ typedef enum { NXT_HTTP_LENGTH_REQUIRED = 411, NXT_HTTP_PAYLOAD_TOO_LARGE = 413, NXT_HTTP_URI_TOO_LONG = 414, + NXT_HTTP_UPGRADE_REQUIRED = 426, NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE = 431, NXT_HTTP_TO_HTTPS = 497, @@ -61,6 +65,13 @@ typedef struct { typedef struct nxt_h1proto_s nxt_h1proto_t; +struct nxt_h1p_websocket_timer_s { + nxt_timer_t timer; + nxt_h1proto_t *h1p; + nxt_msec_t keepalive_interval; +}; + + typedef union { void *any; nxt_h1proto_t *h1; @@ -99,6 +110,7 @@ struct nxt_http_request_s { nxt_mp_t *mem_pool; nxt_buf_t *body; + nxt_buf_t *ws_frame; nxt_buf_t *out; const nxt_http_request_state_t *state; @@ -127,6 +139,8 @@ struct nxt_http_request_s { nxt_timer_t timer; void *timer_data; + void *req_rpc_data; + nxt_buf_t *last; nxt_http_response_t resp; @@ -138,6 +152,7 @@ struct nxt_http_request_s { uint8_t logged; /* 1 bit */ uint8_t header_sent; /* 1 bit */ uint8_t error; /* 1 bit */ + uint8_t websocket_handshake; /* 1 bit */ }; @@ -166,6 +181,8 @@ typedef struct { void (*discard)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last); void (*close)(nxt_task_t *task, nxt_http_proto_t proto, nxt_socket_conf_joint_t *joint); + void (*ws_frame_start)(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame); } nxt_http_proto_table_t; @@ -179,12 +196,15 @@ void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, nxt_http_status_t status); void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r); void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r); +void nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame); void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); nxt_buf_t *nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size); nxt_buf_t *nxt_http_buf_last(nxt_http_request_t *r); void nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data); +void nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_http_request_host(void *ctx, nxt_http_field_t *field, uintptr_t data); @@ -212,5 +232,13 @@ extern nxt_lvlhsh_t nxt_response_fields_hash; extern const nxt_http_proto_table_t nxt_http_proto[]; +void nxt_h1p_websocket_first_frame_start(nxt_task_t *task, + nxt_http_request_t *r, nxt_buf_t *ws_frame); +void nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame); +void nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p); +nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data); + +extern const nxt_conn_state_t nxt_h1p_idle_close_state; #endif /* _NXT_HTTP_H_INCLUDED_ */ diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index 1ab22223..916004d2 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -16,8 +16,6 @@ static void nxt_http_request_proto_info(nxt_task_t *task, static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data); static void nxt_http_request_done(nxt_task_t *task, void *obj, void *data); -static void nxt_http_request_close_handler(nxt_task_t *task, void *obj, - void *data); static u_char *nxt_http_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, size_t size, const char *format); @@ -444,6 +442,16 @@ fail: void +nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame) +{ + if (r->proto.any != NULL) { + nxt_http_proto[r->protocol].ws_frame_start(task, r, ws_frame); + } +} + + +void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) { if (nxt_fast_path(r->proto.any != NULL)) { @@ -530,7 +538,7 @@ nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data) } -static void +void nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) { nxt_http_proto_t proto; @@ -556,12 +564,13 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) } } - protocol = r->protocol; - r->proto.any = NULL; - nxt_mp_release(r->mem_pool); if (nxt_fast_path(proto.any != NULL)) { + protocol = r->protocol; + + nxt_mp_release(r->mem_pool); + nxt_http_proto[protocol].close(task, proto, conf); } } diff --git a/src/nxt_http_response.c b/src/nxt_http_response.c index 755182db..00ecff00 100644 --- a/src/nxt_http_response.c +++ b/src/nxt_http_response.c @@ -28,6 +28,8 @@ static nxt_http_field_proc_t nxt_response_fields[] = { offsetof(nxt_http_request_t, resp.content_type) }, { nxt_string("Content-Length"), &nxt_http_response_field, offsetof(nxt_http_request_t, resp.content_length) }, + { nxt_string("Upgrade"), &nxt_http_response_skip, 0 }, + { nxt_string("Sec-WebSocket-Accept"), &nxt_http_response_skip, 0 }, }; diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c new file mode 100644 index 00000000..d58d615c --- /dev/null +++ b/src/nxt_http_websocket.c @@ -0,0 +1,161 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_router_request.h> +#include <nxt_port_memory_int.h> +#include <nxt_websocket.h> +#include <nxt_websocket_header.h> + + +static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data); +static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, + void *data); + + +const nxt_http_request_state_t nxt_http_websocket + nxt_aligned(64) = +{ + .ready_handler = nxt_http_websocket_client, + .error_handler = nxt_http_websocket_error_handler, +}; + + +static void +nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) +{ + size_t frame_size, used_size, copy_size, buf_free_size; + size_t chunk_copy_size; + nxt_buf_t *out, *buf, **out_tail, *b, *next; + nxt_int_t res; + nxt_http_request_t *r; + nxt_request_app_link_t *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; + nxt_websocket_header_t *wsh; + + r = obj; + + if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL + || (req_app_link = req_rpc_data->req_app_link) == NULL)) + { + nxt_debug(task, "websocket client frame for destroyed request"); + + return; + } + + nxt_debug(task, "http websocket client frame"); + + wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; + + frame_size = nxt_websocket_frame_header_size(wsh) + + nxt_websocket_frame_payload_len(wsh); + + buf = NULL; + buf_free_size = 0; + out = NULL; + out_tail = &out; + + b = r->ws_frame; + + while (b != NULL && frame_size > 0) { + used_size = nxt_buf_mem_used_size(&b->mem); + copy_size = nxt_min(used_size, frame_size); + + while (copy_size > 0) { + if (buf == NULL || buf_free_size == 0) { + buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); + + buf = nxt_port_mmap_get_buf(task, req_app_link->app_port, + buf_free_size); + + *out_tail = buf; + out_tail = &buf->next; + } + + chunk_copy_size = nxt_min(buf_free_size, copy_size); + + buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos, + chunk_copy_size); + + copy_size -= chunk_copy_size; + b->mem.pos += chunk_copy_size; + buf_free_size -= chunk_copy_size; + } + + frame_size -= copy_size; + next = b->next; + + if (nxt_buf_mem_used_size(&b->mem) == 0) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + + r->ws_frame = next; + } + + b = next; + } + + res = nxt_port_socket_twrite(task, req_app_link->app_port, + NXT_PORT_MSG_WEBSOCKET, -1, + req_app_link->stream, + req_app_link->reply_port->id, out, NULL); + if (nxt_slow_path(res != NXT_OK)) { + // TODO: handle + } + + b = r->ws_frame; + + if (b != NULL) { + used_size = nxt_buf_mem_used_size(&b->mem); + + if (used_size > 0) { + nxt_memmove(b->mem.start, b->mem.pos, used_size); + + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start + used_size; + } + } + + nxt_http_request_ws_frame_start(task, r, r->ws_frame); +} + + +static void +nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + nxt_request_app_link_t *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; + + nxt_debug(task, "http websocket error handler"); + + r = obj; + + if ((req_rpc_data = r->req_rpc_data) == NULL) { + nxt_debug(task, " req_rpc_data is NULL"); + goto close_handler; + } + + if ((req_app_link = req_rpc_data->req_app_link) == NULL) { + nxt_debug(task, " req_app_link is NULL"); + goto close_handler; + } + + if (req_app_link->app_port == NULL) { + nxt_debug(task, " app_port is NULL"); + goto close_handler; + } + + (void) nxt_port_socket_twrite(task, req_app_link->app_port, + NXT_PORT_MSG_WEBSOCKET_LAST, + -1, req_app_link->stream, + req_app_link->reply_port->id, NULL, NULL); + +close_handler: + + nxt_http_request_close_handler(task, obj, data); +} diff --git a/src/nxt_port.c b/src/nxt_port.c index aff46666..cef65cab 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -68,6 +68,7 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); nxt_queue_init(&port->pending_requests); + nxt_queue_init(&port->active_websockets); } else { nxt_mp_destroy(mp); diff --git a/src/nxt_port.h b/src/nxt_port.h index e4e76693..eeb6caa5 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -36,6 +36,12 @@ struct nxt_port_handlers_s { /* Stop process command. */ nxt_port_handler_t quit; + /* Request headers. */ + nxt_port_handler_t req_headers; + + /* Websocket frame. */ + nxt_port_handler_t websocket_frame; + /* Various data. */ nxt_port_handler_t data; }; @@ -71,6 +77,9 @@ typedef enum { _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid), _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), + _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers), + _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame), + _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) @@ -99,6 +108,10 @@ typedef enum { NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST, NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS, + NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET, + NXT_PORT_MSG_WEBSOCKET_LAST = _NXT_PORT_MSG_WEBSOCKET | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, } nxt_port_msg_type_t; @@ -181,6 +194,8 @@ struct nxt_port_s { uint32_t app_responses; nxt_queue_t pending_requests; + nxt_queue_t active_websockets; + nxt_port_handler_t handler; nxt_port_handler_t *data; diff --git a/src/nxt_router.c b/src/nxt_router.c index 566e0c65..b87f588f 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -14,7 +14,7 @@ #include <nxt_port_memory_int.h> #include <nxt_unit_request.h> #include <nxt_unit_response.h> - +#include <nxt_router_request.h> typedef struct { nxt_str_t type; @@ -48,64 +48,6 @@ typedef struct { #endif -typedef struct nxt_msg_info_s { - nxt_buf_t *buf; - nxt_port_mmap_tracking_t tracking; - nxt_work_handler_t completion_handler; -} nxt_msg_info_t; - - -typedef struct nxt_request_app_link_s nxt_request_app_link_t; - - -typedef enum { - NXT_APR_NEW_PORT, - NXT_APR_REQUEST_FAILED, - NXT_APR_GOT_RESPONSE, - NXT_APR_CLOSE, -} nxt_apr_action_t; - - -typedef struct { - uint32_t stream; - nxt_app_t *app; - - nxt_port_t *app_port; - nxt_apr_action_t apr_action; - - nxt_http_request_t *request; - nxt_msg_info_t msg_info; - nxt_request_app_link_t *req_app_link; -} nxt_request_rpc_data_t; - - -struct nxt_request_app_link_s { - uint32_t stream; - nxt_atomic_t use_count; - - nxt_port_t *app_port; - nxt_apr_action_t apr_action; - - nxt_port_t *reply_port; - nxt_http_request_t *request; - nxt_msg_info_t msg_info; - nxt_request_rpc_data_t *req_rpc_data; - - nxt_nsec_t res_time; - - nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ - /* for nxt_port_t.pending_requests */ - nxt_queue_link_t link_port_pending; - nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ - - nxt_mp_t *mem_pool; - nxt_work_t work; - - int err_code; - const char *err_str; -}; - - typedef struct { nxt_socket_conf_t *socket_conf; nxt_router_temp_conf_t *temp_conf; @@ -305,6 +247,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); +const nxt_http_request_state_t nxt_http_websocket; + static nxt_router_t *nxt_router; static const nxt_str_t http_prefix = nxt_string("HTTP_"); @@ -663,6 +607,7 @@ nxt_request_app_link_release(nxt_task_t *task, nxt_request_app_link_t *req_app_link) { nxt_mp_t *mp; + nxt_http_request_t *r; nxt_request_rpc_data_t *req_rpc_data; nxt_assert(task->thread->engine == req_app_link->work.data); @@ -683,10 +628,11 @@ nxt_request_app_link_release(nxt_task_t *task, req_rpc_data->msg_info = req_app_link->msg_info; if (req_rpc_data->app->timeout != 0) { - req_rpc_data->request->timer.handler = nxt_router_app_timeout; - req_rpc_data->request->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, - &req_rpc_data->request->timer, + r = req_rpc_data->request; + + r->timer.handler = nxt_router_app_timeout; + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, req_rpc_data->app->timeout); } @@ -833,14 +779,16 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, if (req_app_link->link_app_requests.next == NULL && req_app_link->link_port_pending.next == NULL - && req_app_link->link_app_pending.next == NULL) + && req_app_link->link_app_pending.next == NULL + && req_app_link->link_port_websockets.next == NULL) { req_app_link = NULL; } else { ra_use_delta -= nxt_queue_chk_remove(&req_app_link->link_app_requests) - + nxt_queue_chk_remove(&req_app_link->link_port_pending); + + nxt_queue_chk_remove(&req_app_link->link_port_pending) + + nxt_queue_chk_remove(&req_app_link->link_port_websockets); nxt_queue_chk_remove(&req_app_link->link_app_pending); } @@ -863,6 +811,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_router_http_request_done(task, req_rpc_data->request); + req_rpc_data->request->req_rpc_data = NULL; req_rpc_data->request = NULL; } } @@ -1412,6 +1361,28 @@ static nxt_conf_map_t nxt_router_http_conf[] = { }; +static nxt_conf_map_t nxt_router_websocket_conf[] = { + { + nxt_string("max_frame_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_websocket_conf_t, max_frame_size), + }, + + { + nxt_string("read_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_websocket_conf_t, read_timeout), + }, + + { + nxt_string("keepalive_interval"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_websocket_conf_t, keepalive_interval), + }, + +}; + + static nxt_int_t nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end) @@ -1425,7 +1396,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_router_t *router; nxt_app_joint_t *app_joint; - nxt_conf_value_t *conf, *http, *value; + nxt_conf_value_t *conf, *http, *value, *websocket; nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; nxt_conf_value_t *routes_conf; @@ -1448,6 +1419,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, #if (NXT_TLS) static nxt_str_t certificate_path = nxt_string("/tls/certificate"); #endif + static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); if (conf == NULL) { @@ -1658,6 +1630,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } #endif + websocket = nxt_conf_get_path(conf, &websocket_path); + listeners = nxt_conf_get_path(conf, &listeners_path); if (listeners != NULL) { @@ -1697,6 +1671,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->body_read_timeout = 30 * 1000; skcf->send_timeout = 30 * 1000; + skcf->websocket_conf.max_frame_size = 1024 * 1024; + skcf->websocket_conf.read_timeout = 60 * 1000; + skcf->websocket_conf.keepalive_interval = 30 * 1000; + if (http != NULL) { ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, nxt_nitems(nxt_router_http_conf), @@ -1707,6 +1685,17 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } + if (websocket != NULL) { + ret = nxt_conf_map_object(mp, websocket, + nxt_router_websocket_conf, + nxt_nitems(nxt_router_websocket_conf), + &skcf->websocket_conf); + if (ret != NXT_OK) { + nxt_alert(task, "websocket map error"); + goto fail; + } + } + #if (NXT_TLS) value = nxt_conf_get_path(listener, &certificate_path); @@ -3418,10 +3407,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, { nxt_int_t ret; nxt_buf_t *b; + nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; nxt_http_request_t *r; nxt_unit_response_t *resp; + nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; b = msg->buf; @@ -3542,7 +3533,48 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_http_request_header_send(task, r); - r->state = &nxt_http_request_send_state; + if (r->websocket_handshake + && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) + { + req_app_link = nxt_request_app_link_alloc(task, + req_rpc_data->req_app_link, + req_rpc_data); + if (nxt_slow_path(req_app_link == NULL)) { + goto fail; + } + + app_port = req_app_link->app_port; + + if (app_port == NULL && req_rpc_data->app_port != NULL) { + req_app_link->app_port = req_rpc_data->app_port; + app_port = req_app_link->app_port; + req_app_link->apr_action = req_rpc_data->apr_action; + + req_rpc_data->app_port = NULL; + } + + if (nxt_slow_path(app_port == NULL)) { + goto fail; + } + + nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + + nxt_queue_insert_tail(&app_port->active_websockets, + &req_app_link->link_port_websockets); + + nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + + nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); + req_app_link->apr_action = NXT_APR_CLOSE; + + nxt_debug(task, "req_app_link stream #%uD upgrade", + req_app_link->stream); + + r->state = &nxt_http_websocket; + + } else { + r->state = &nxt_http_request_send_state; + } if (r->out) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, @@ -3924,6 +3956,10 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, got_response = 1; inc_use = -1; break; + case NXT_APR_UPGRADE: + dec_pending = 1; + got_response = 1; + break; case NXT_APR_CLOSE: inc_use = -1; break; @@ -4046,9 +4082,10 @@ re_ra_cancelled: adjust_idle_timer = 0; - if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) { - nxt_assert(port->idle_link.next == NULL); - + if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0 + && nxt_queue_is_empty(&port->active_websockets) + && port->idle_link.next == NULL) + { if (app->idle_processes == app->spare_processes && app->adjust_idle_work.data == NULL) { @@ -4545,6 +4582,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_router_app_use(task, app, 1); req_rpc_data->request = r; + r->req_rpc_data = req_rpc_data; req_app_link = &ra_local; nxt_request_app_link_init(task, req_app_link, req_rpc_data); @@ -4635,7 +4673,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, goto release_port; } - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA, + res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, -1, req_app_link->stream, reply_port->id, buf, &req_app_link->msg_info.tracking); @@ -4785,6 +4823,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, *p++ = '\0'; req->tls = (r->tls != NULL); + req->websocket_handshake = r->websocket_handshake; req->server_name_length = r->server_name.length; nxt_unit_sptr_set(&req->server_name, p); diff --git a/src/nxt_router.h b/src/nxt_router.h index ff791e3d..b55a4de3 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -139,6 +139,13 @@ struct nxt_app_s { typedef struct { + size_t max_frame_size; + nxt_msec_t read_timeout; + nxt_msec_t keepalive_interval; +} nxt_websocket_conf_t; + + +typedef struct { uint32_t count; nxt_queue_link_t link; nxt_router_conf_t *router_conf; @@ -164,6 +171,8 @@ typedef struct { nxt_msec_t body_read_timeout; nxt_msec_t send_timeout; + nxt_websocket_conf_t websocket_conf; + #if (NXT_TLS) nxt_tls_conf_t *tls; #endif diff --git a/src/nxt_router_request.h b/src/nxt_router_request.h new file mode 100644 index 00000000..c3d5767e --- /dev/null +++ b/src/nxt_router_request.h @@ -0,0 +1,71 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_ROUTER_REQUEST_H_INCLUDED_ +#define _NXT_ROUTER_REQUEST_H_INCLUDED_ + + +typedef struct nxt_msg_info_s { + nxt_buf_t *buf; + nxt_port_mmap_tracking_t tracking; + nxt_work_handler_t completion_handler; +} nxt_msg_info_t; + + +typedef struct nxt_request_app_link_s nxt_request_app_link_t; + + +typedef enum { + NXT_APR_NEW_PORT, + NXT_APR_REQUEST_FAILED, + NXT_APR_GOT_RESPONSE, + NXT_APR_UPGRADE, + NXT_APR_CLOSE, +} nxt_apr_action_t; + + +typedef struct { + uint32_t stream; + nxt_app_t *app; + + nxt_port_t *app_port; + nxt_apr_action_t apr_action; + + nxt_http_request_t *request; + nxt_msg_info_t msg_info; + nxt_request_app_link_t *req_app_link; +} nxt_request_rpc_data_t; + + +struct nxt_request_app_link_s { + uint32_t stream; + nxt_atomic_t use_count; + + nxt_port_t *app_port; + nxt_apr_action_t apr_action; + + nxt_port_t *reply_port; + nxt_http_request_t *request; + nxt_msg_info_t msg_info; + nxt_request_rpc_data_t *req_rpc_data; + + nxt_nsec_t res_time; + + nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ + /* for nxt_port_t.pending_requests */ + nxt_queue_link_t link_port_pending; + nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ + /* for nxt_port_t.active_websockets */ + nxt_queue_link_t link_port_websockets; + + nxt_mp_t *mem_pool; + nxt_work_t work; + + int err_code; + const char *err_str; +}; + + +#endif /* _NXT_ROUTER_REQUEST_H_INCLUDED_ */ diff --git a/src/nxt_sha1.c b/src/nxt_sha1.c new file mode 100644 index 00000000..407c5933 --- /dev/null +++ b/src/nxt_sha1.c @@ -0,0 +1,295 @@ + +/* + * Copyright (C) Maxim Dounin + * Copyright (C) NGINX, Inc. + * + * An internal SHA1 implementation. + */ + + +#include <nxt_main.h> +#include <nxt_sha1.h> + + +static const u_char *nxt_sha1_body(nxt_sha1_t *ctx, const u_char *data, + size_t size); + + +void +nxt_sha1_init(nxt_sha1_t *ctx) +{ + ctx->a = 0x67452301; + ctx->b = 0xefcdab89; + ctx->c = 0x98badcfe; + ctx->d = 0x10325476; + ctx->e = 0xc3d2e1f0; + + ctx->bytes = 0; +} + + +void +nxt_sha1_update(nxt_sha1_t *ctx, const void *data, size_t size) +{ + size_t used, free; + + used = (size_t) (ctx->bytes & 0x3f); + ctx->bytes += size; + + if (used) { + free = 64 - used; + + if (size < free) { + memcpy(&ctx->buffer[used], data, size); + return; + } + + memcpy(&ctx->buffer[used], data, free); + data = (u_char *) data + free; + size -= free; + (void) nxt_sha1_body(ctx, ctx->buffer, 64); + } + + if (size >= 64) { + data = nxt_sha1_body(ctx, data, size & ~(size_t) 0x3f); + size &= 0x3f; + } + + memcpy(ctx->buffer, data, size); +} + + +void +nxt_sha1_final(u_char result[20], nxt_sha1_t *ctx) +{ + size_t used, free; + + used = (size_t) (ctx->bytes & 0x3f); + + ctx->buffer[used++] = 0x80; + + free = 64 - used; + + if (free < 8) { + nxt_memzero(&ctx->buffer[used], free); + (void) nxt_sha1_body(ctx, ctx->buffer, 64); + used = 0; + free = 64; + } + + nxt_memzero(&ctx->buffer[used], free - 8); + + ctx->bytes <<= 3; + ctx->buffer[56] = (u_char) (ctx->bytes >> 56); + ctx->buffer[57] = (u_char) (ctx->bytes >> 48); + ctx->buffer[58] = (u_char) (ctx->bytes >> 40); + ctx->buffer[59] = (u_char) (ctx->bytes >> 32); + ctx->buffer[60] = (u_char) (ctx->bytes >> 24); + ctx->buffer[61] = (u_char) (ctx->bytes >> 16); + ctx->buffer[62] = (u_char) (ctx->bytes >> 8); + ctx->buffer[63] = (u_char) ctx->bytes; + + (void) nxt_sha1_body(ctx, ctx->buffer, 64); + + result[0] = (u_char) (ctx->a >> 24); + result[1] = (u_char) (ctx->a >> 16); + result[2] = (u_char) (ctx->a >> 8); + result[3] = (u_char) ctx->a; + result[4] = (u_char) (ctx->b >> 24); + result[5] = (u_char) (ctx->b >> 16); + result[6] = (u_char) (ctx->b >> 8); + result[7] = (u_char) ctx->b; + result[8] = (u_char) (ctx->c >> 24); + result[9] = (u_char) (ctx->c >> 16); + result[10] = (u_char) (ctx->c >> 8); + result[11] = (u_char) ctx->c; + result[12] = (u_char) (ctx->d >> 24); + result[13] = (u_char) (ctx->d >> 16); + result[14] = (u_char) (ctx->d >> 8); + result[15] = (u_char) ctx->d; + result[16] = (u_char) (ctx->e >> 24); + result[17] = (u_char) (ctx->e >> 16); + result[18] = (u_char) (ctx->e >> 8); + result[19] = (u_char) ctx->e; + + nxt_memzero(ctx, sizeof(*ctx)); +} + + +/* + * Helper functions. + */ + +#define ROTATE(bits, word) (((word) << (bits)) | ((word) >> (32 - (bits)))) + +#define F1(b, c, d) (((b) & (c)) | ((~(b)) & (d))) +#define F2(b, c, d) ((b) ^ (c) ^ (d)) +#define F3(b, c, d) (((b) & (c)) | ((b) & (d)) | ((c) & (d))) + +#define STEP(f, a, b, c, d, e, w, t) \ + temp = ROTATE(5, (a)) + f((b), (c), (d)) + (e) + (w) + (t); \ + (e) = (d); \ + (d) = (c); \ + (c) = ROTATE(30, (b)); \ + (b) = (a); \ + (a) = temp; + + +/* + * GET() reads 4 input bytes in big-endian byte order and returns + * them as uint32_t. + */ + +#define GET(n) \ + ( ((uint32_t) p[n * 4 + 3]) \ + | ((uint32_t) p[n * 4 + 2] << 8) \ + | ((uint32_t) p[n * 4 + 1] << 16) \ + | ((uint32_t) p[n * 4] << 24)) + + +/* + * This processes one or more 64-byte data blocks, but does not update + * the bit counters. There are no alignment requirements. + */ + +static const u_char * +nxt_sha1_body(nxt_sha1_t *ctx, const u_char *data, size_t size) +{ + uint32_t a, b, c, d, e, temp; + uint32_t saved_a, saved_b, saved_c, saved_d, saved_e; + uint32_t words[80]; + nxt_uint_t i; + const u_char *p; + + p = data; + + a = ctx->a; + b = ctx->b; + c = ctx->c; + d = ctx->d; + e = ctx->e; + + do { + saved_a = a; + saved_b = b; + saved_c = c; + saved_d = d; + saved_e = e; + + /* Load data block into the words array */ + + for (i = 0; i < 16; i++) { + words[i] = GET(i); + } + + for (i = 16; i < 80; i++) { + words[i] = ROTATE(1, words[i - 3] + ^ words[i - 8] + ^ words[i - 14] + ^ words[i - 16]); + } + + /* Transformations */ + + STEP(F1, a, b, c, d, e, words[0], 0x5a827999); + STEP(F1, a, b, c, d, e, words[1], 0x5a827999); + STEP(F1, a, b, c, d, e, words[2], 0x5a827999); + STEP(F1, a, b, c, d, e, words[3], 0x5a827999); + STEP(F1, a, b, c, d, e, words[4], 0x5a827999); + STEP(F1, a, b, c, d, e, words[5], 0x5a827999); + STEP(F1, a, b, c, d, e, words[6], 0x5a827999); + STEP(F1, a, b, c, d, e, words[7], 0x5a827999); + STEP(F1, a, b, c, d, e, words[8], 0x5a827999); + STEP(F1, a, b, c, d, e, words[9], 0x5a827999); + STEP(F1, a, b, c, d, e, words[10], 0x5a827999); + STEP(F1, a, b, c, d, e, words[11], 0x5a827999); + STEP(F1, a, b, c, d, e, words[12], 0x5a827999); + STEP(F1, a, b, c, d, e, words[13], 0x5a827999); + STEP(F1, a, b, c, d, e, words[14], 0x5a827999); + STEP(F1, a, b, c, d, e, words[15], 0x5a827999); + STEP(F1, a, b, c, d, e, words[16], 0x5a827999); + STEP(F1, a, b, c, d, e, words[17], 0x5a827999); + STEP(F1, a, b, c, d, e, words[18], 0x5a827999); + STEP(F1, a, b, c, d, e, words[19], 0x5a827999); + + STEP(F2, a, b, c, d, e, words[20], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[21], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[22], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[23], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[24], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[25], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[26], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[27], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[28], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[29], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[30], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[31], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[32], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[33], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[34], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[35], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[36], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[37], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[38], 0x6ed9eba1); + STEP(F2, a, b, c, d, e, words[39], 0x6ed9eba1); + + STEP(F3, a, b, c, d, e, words[40], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[41], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[42], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[43], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[44], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[45], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[46], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[47], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[48], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[49], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[50], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[51], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[52], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[53], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[54], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[55], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[56], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[57], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[58], 0x8f1bbcdc); + STEP(F3, a, b, c, d, e, words[59], 0x8f1bbcdc); + + STEP(F2, a, b, c, d, e, words[60], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[61], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[62], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[63], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[64], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[65], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[66], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[67], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[68], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[69], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[70], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[71], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[72], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[73], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[74], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[75], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[76], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[77], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[78], 0xca62c1d6); + STEP(F2, a, b, c, d, e, words[79], 0xca62c1d6); + + a += saved_a; + b += saved_b; + c += saved_c; + d += saved_d; + e += saved_e; + + p += 64; + + } while (size -= 64); + + ctx->a = a; + ctx->b = b; + ctx->c = c; + ctx->d = d; + ctx->e = e; + + return p; +} diff --git a/src/nxt_sha1.h b/src/nxt_sha1.h new file mode 100644 index 00000000..2816982b --- /dev/null +++ b/src/nxt_sha1.h @@ -0,0 +1,24 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + + +#ifndef _NXT_SHA1_H_INCLUDED_ +#define _NXT_SHA1_H_INCLUDED_ + + +typedef struct { + uint64_t bytes; + uint32_t a, b, c, d, e; + u_char buffer[64]; +} nxt_sha1_t; + + +NXT_EXPORT void nxt_sha1_init(nxt_sha1_t *ctx); +NXT_EXPORT void nxt_sha1_update(nxt_sha1_t *ctx, const void *data, size_t size); +NXT_EXPORT void nxt_sha1_final(u_char result[20], nxt_sha1_t *ctx); + + +#endif /* _NXT_SHA1_H_INCLUDED_ */ diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 88c7fa6a..28a0de20 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -11,38 +11,60 @@ #include "nxt_unit.h" #include "nxt_unit_request.h" #include "nxt_unit_response.h" +#include "nxt_unit_websocket.h" + +#include "nxt_websocket.h" #if (NXT_HAVE_MEMFD_CREATE) #include <linux/memfd.h> #endif -typedef struct nxt_unit_impl_s nxt_unit_impl_t; -typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; -typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; -typedef struct nxt_unit_process_s nxt_unit_process_t; -typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; -typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; -typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; -typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; -typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; +typedef struct nxt_unit_impl_s nxt_unit_impl_t; +typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; +typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; +typedef struct nxt_unit_process_s nxt_unit_process_t; +typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; +typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; +typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; +typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; +typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; +typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static void nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); +nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, + nxt_unit_mmap_buf_t *mmap_buf); +nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, + nxt_unit_mmap_buf_t *mmap_buf); +nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, uint32_t stream); +static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); +static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( + nxt_unit_ctx_t *ctx); +static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); +static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_mmap_buf_t *mmap_buf, int last); +static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); +static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, + size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n); @@ -65,7 +87,7 @@ static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf); + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size); @@ -98,14 +120,22 @@ static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); +static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, + nxt_unit_request_info_impl_t *req_impl); +static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( + nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); + static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); struct nxt_unit_mmap_buf_s { nxt_unit_buf_t buf; + nxt_unit_mmap_buf_t *next; + nxt_unit_mmap_buf_t **prev; + nxt_port_mmap_header_t *hdr; - nxt_queue_link_t link; +// nxt_queue_link_t link; nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; @@ -113,12 +143,20 @@ struct nxt_unit_mmap_buf_s { struct nxt_unit_recv_msg_s { - nxt_port_msg_t port_msg; + uint32_t stream; + nxt_pid_t pid; + nxt_port_id_t reply_port; + + uint8_t last; /* 1 bit */ + uint8_t mmap; /* 1 bit */ void *start; uint32_t size; + int fd; nxt_unit_process_t *process; + + nxt_unit_mmap_buf_t *incoming_buf; }; @@ -127,18 +165,22 @@ typedef enum { NXT_UNIT_RS_RESPONSE_INIT, NXT_UNIT_RS_RESPONSE_HAS_CONTENT, NXT_UNIT_RS_RESPONSE_SENT, - NXT_UNIT_RS_DONE, + NXT_UNIT_RS_RELEASED, } nxt_unit_req_state_t; struct nxt_unit_request_info_impl_s { nxt_unit_request_info_t req; - nxt_unit_recv_msg_t recv_msg; - nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */ - nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */ + uint32_t stream; + + nxt_unit_process_t *process; + + nxt_unit_mmap_buf_t *outgoing_buf; + nxt_unit_mmap_buf_t *incoming_buf; nxt_unit_req_state_t state; + uint8_t websocket; nxt_queue_link_t link; @@ -146,6 +188,19 @@ struct nxt_unit_request_info_impl_s { }; +struct nxt_unit_websocket_frame_impl_s { + nxt_unit_websocket_frame_t ws; + + nxt_unit_mmap_buf_t *buf; + + nxt_queue_link_t link; + + nxt_unit_ctx_impl_t *ctx_impl; + + void *retain_buf; +}; + + struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; @@ -154,14 +209,20 @@ struct nxt_unit_ctx_impl_s { nxt_queue_link_t link; - nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */ + nxt_unit_mmap_buf_t *free_buf; /* of nxt_unit_request_info_impl_t */ nxt_queue_t free_req; + /* of nxt_unit_websocket_frame_impl_t */ + nxt_queue_t free_ws; + /* of nxt_unit_request_info_impl_t */ nxt_queue_t active_req; + /* of nxt_unit_request_info_impl_t */ + nxt_lvlhsh_t requests; + nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_request_info_impl_t req; @@ -394,18 +455,65 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); - nxt_queue_init(&ctx_impl->free_buf); nxt_queue_init(&ctx_impl->free_req); + nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); - nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link); - nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link); + ctx_impl->free_buf = NULL; + nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); + nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); + nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; ctx_impl->read_port_fd = -1; + ctx_impl->requests.slot = 0; +} + + +nxt_inline void +nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, + nxt_unit_mmap_buf_t *mmap_buf) +{ + mmap_buf->next = *head; + + if (mmap_buf->next != NULL) { + mmap_buf->next->prev = &mmap_buf->next; + } + + *head = mmap_buf; + mmap_buf->prev = head; +} + + +nxt_inline void +nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, + nxt_unit_mmap_buf_t *mmap_buf) +{ + while (*prev != NULL) { + prev = &(*prev)->next; + } + + nxt_unit_mmap_buf_insert(prev, mmap_buf); +} + + +nxt_inline void +nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) +{ + nxt_unit_mmap_buf_t **prev; + + prev = mmap_buf->prev; + + if (mmap_buf->next != NULL) { + mmap_buf->next->prev = prev; + } + + if (prev != NULL) { + *prev = mmap_buf->next; + } } @@ -509,26 +617,18 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int fd, rc, nb; - pid_t pid; - nxt_queue_t incoming_buf; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_unit_impl_t *lib; - nxt_unit_port_t new_port; - nxt_queue_link_t *lnk; - nxt_unit_request_t *r; - nxt_unit_mmap_buf_t *b; - nxt_unit_recv_msg_t recv_msg; - nxt_unit_callbacks_t *cb; - nxt_port_msg_new_port_t *new_port_msg; - nxt_unit_request_info_t *req; - nxt_unit_request_info_impl_t *req_impl; + int rc; + pid_t pid; + struct cmsghdr *cm; + nxt_port_msg_t *port_msg; + nxt_unit_impl_t *lib; + nxt_unit_recv_msg_t recv_msg; + nxt_unit_callbacks_t *cb; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_ERROR; - fd = -1; + recv_msg.fd = -1; recv_msg.process = NULL; port_msg = buf; cm = oob; @@ -538,17 +638,22 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { - memcpy(&fd, CMSG_DATA(cm), sizeof(int)); + memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); } - nxt_queue_init(&incoming_buf); + recv_msg.incoming_buf = NULL; if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); goto fail; } - recv_msg.port_msg = *port_msg; + recv_msg.stream = port_msg->stream; + recv_msg.pid = port_msg->pid; + recv_msg.reply_port = port_msg->reply_port; + recv_msg.last = port_msg->last; + recv_msg.mmap = port_msg->mmap; + recv_msg.start = port_msg + 1; recv_msg.size = buf_size - sizeof(nxt_port_msg_t); @@ -572,7 +677,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } if (port_msg->mmap) { - if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) { + if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { goto fail; } } @@ -589,187 +694,326 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, break; case _NXT_PORT_MSG_NEW_PORT: - if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) { - nxt_unit_warn(ctx, "#%"PRIu32": new_port: " - "invalid message size (%d)", - port_msg->stream, (int) recv_msg.size); + rc = nxt_unit_process_new_port(ctx, &recv_msg); + break; - goto fail; - } + case _NXT_PORT_MSG_CHANGE_FILE: + nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", + port_msg->stream, recv_msg.fd); + break; - if (nxt_slow_path(fd < 0)) { - nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", - port_msg->stream, fd); + case _NXT_PORT_MSG_MMAP: + if (nxt_slow_path(recv_msg.fd < 0)) { + nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", + port_msg->stream, recv_msg.fd); goto fail; } - new_port_msg = recv_msg.start; + rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); + break; - nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", - port_msg->stream, (int) new_port_msg->pid, - (int) new_port_msg->id, fd); + case _NXT_PORT_MSG_REQ_HEADERS: + rc = nxt_unit_process_req_headers(ctx, &recv_msg); + break; - nb = 0; + case _NXT_PORT_MSG_WEBSOCKET: + rc = nxt_unit_process_websocket(ctx, &recv_msg); + break; - if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { - nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " - "failed: %s (%d)", fd, strerror(errno), errno); + case _NXT_PORT_MSG_REMOVE_PID: + if (nxt_slow_path(recv_msg.size != sizeof(pid))) { + nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " + "(%d != %d)", port_msg->stream, (int) recv_msg.size, + (int) sizeof(pid)); goto fail; } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); + memcpy(&pid, recv_msg.start, sizeof(pid)); - new_port.in_fd = -1; - new_port.out_fd = fd; - new_port.data = NULL; + nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", + port_msg->stream, (int) pid); - fd = -1; + cb->remove_pid(ctx, pid); - rc = cb->add_port(ctx, &new_port); + rc = NXT_UNIT_OK; break; - case _NXT_PORT_MSG_CHANGE_FILE: - nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", - port_msg->stream, fd); - break; + default: + nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", + port_msg->stream, (int) port_msg->type); - case _NXT_PORT_MSG_MMAP: - if (nxt_slow_path(fd < 0)) { - nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", - port_msg->stream, fd); + goto fail; + } - goto fail; - } +fail: - rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd); - break; + if (recv_msg.fd != -1) { + close(recv_msg.fd); + } - case _NXT_PORT_MSG_DATA: - if (nxt_slow_path(port_msg->mmap == 0)) { - nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", - port_msg->stream); + while (recv_msg.incoming_buf != NULL) { + nxt_unit_mmap_buf_free(recv_msg.incoming_buf); + } - goto fail; - } + if (recv_msg.process != NULL) { + nxt_unit_process_use(ctx, recv_msg.process, -1); + } - if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) { - nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " - "%d expected", port_msg->stream, (int) recv_msg.size, - (int) sizeof(nxt_unit_request_t)); + return rc; +} - goto fail; - } - req_impl = nxt_unit_request_info_get(ctx); - if (nxt_slow_path(req_impl == NULL)) { - nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", - port_msg->stream); +static int +nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + int nb; + nxt_unit_impl_t *lib; + nxt_unit_port_t new_port; + nxt_port_msg_new_port_t *new_port_msg; - goto fail; - } + if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { + nxt_unit_warn(ctx, "#%"PRIu32": new_port: " + "invalid message size (%d)", + recv_msg->stream, (int) recv_msg->size); - req = &req_impl->req; + return NXT_UNIT_ERROR; + } - req->request_port = *port_id; + if (nxt_slow_path(recv_msg->fd < 0)) { + nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", + recv_msg->stream, recv_msg->fd); - nxt_unit_port_id_init(&req->response_port, port_msg->pid, - port_msg->reply_port); + return NXT_UNIT_ERROR; + } - req->request = recv_msg.start; + new_port_msg = recv_msg->start; - lnk = nxt_queue_first(&incoming_buf); - b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); + nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", + recv_msg->stream, (int) new_port_msg->pid, + (int) new_port_msg->id, recv_msg->fd); - req->request_buf = &b->buf; - req->response = NULL; - req->response_buf = NULL; + nb = 0; - r = req->request; + if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " + "failed: %s (%d)", recv_msg->fd, strerror(errno), errno); - req->content_length = r->content_length; + return NXT_UNIT_ERROR; + } - req->content_buf = req->request_buf; - req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, + new_port_msg->id); - /* Move process to req_impl. */ - req_impl->recv_msg = recv_msg; + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd; + new_port.data = NULL; - recv_msg.process = NULL; + recv_msg->fd = -1; - nxt_queue_init(&req_impl->outgoing_buf); - nxt_queue_init(&req_impl->incoming_buf); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) - { - b->req = req; - } nxt_queue_loop; + return lib->callbacks.add_port(ctx, &new_port); +} - nxt_queue_add(&req_impl->incoming_buf, &incoming_buf); - nxt_queue_init(&incoming_buf); - req->response_max_fields = 0; - req_impl->state = NXT_UNIT_RS_START; +static int +nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + nxt_unit_impl_t *lib; + nxt_unit_request_t *r; + nxt_unit_mmap_buf_t *b; + nxt_unit_request_info_t *req; + nxt_unit_request_info_impl_t *req_impl; - nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream, - (int) r->method_length, nxt_unit_sptr_get(&r->method), - (int) r->target_length, nxt_unit_sptr_get(&r->target), - (int) r->content_length); + if (nxt_slow_path(recv_msg->mmap == 0)) { + nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", + recv_msg->stream); - cb->request_handler(req); + return NXT_UNIT_ERROR; + } - rc = NXT_UNIT_OK; - break; + if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { + nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " + "%d expected", recv_msg->stream, (int) recv_msg->size, + (int) sizeof(nxt_unit_request_t)); - case _NXT_PORT_MSG_REMOVE_PID: - if (nxt_slow_path(recv_msg.size != sizeof(pid))) { - nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " - "(%d != %d)", port_msg->stream, (int) recv_msg.size, - (int) sizeof(pid)); + return NXT_UNIT_ERROR; + } - goto fail; - } + req_impl = nxt_unit_request_info_get(ctx); + if (nxt_slow_path(req_impl == NULL)) { + nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", + recv_msg->stream); - memcpy(&pid, recv_msg.start, sizeof(pid)); + return NXT_UNIT_ERROR; + } - nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", - port_msg->stream, (int) pid); + req = &req_impl->req; - cb->remove_pid(ctx, pid); + nxt_unit_port_id_init(&req->response_port, recv_msg->pid, + recv_msg->reply_port); - rc = NXT_UNIT_OK; - break; + req->request = recv_msg->start; - default: - nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", - port_msg->stream, (int) port_msg->type); + b = recv_msg->incoming_buf; - goto fail; + req->request_buf = &b->buf; + req->response = NULL; + req->response_buf = NULL; + + r = req->request; + + req->content_length = r->content_length; + + req->content_buf = req->request_buf; + req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); + + /* "Move" process reference to req_impl. */ + req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); + if (nxt_slow_path(req_impl->process == NULL)) { + return NXT_UNIT_ERROR; } -fail: + recv_msg->process = NULL; - if (fd != -1) { - close(fd); + req_impl->stream = recv_msg->stream; + + req_impl->outgoing_buf = NULL; + + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; } - if (port_msg->mmap) { - nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) - { - nxt_unit_mmap_release(b->hdr, b->buf.start, - b->buf.end - b->buf.start); + /* "Move" incoming buffer list to req_impl. */ + req_impl->incoming_buf = recv_msg->incoming_buf; + req_impl->incoming_buf->prev = &req_impl->incoming_buf; + recv_msg->incoming_buf = NULL; + + req->response_max_fields = 0; + req_impl->state = NXT_UNIT_RS_START; + req_impl->websocket = 0; + + nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, + (int) r->method_length, nxt_unit_sptr_get(&r->method), + (int) r->target_length, nxt_unit_sptr_get(&r->target), + (int) r->content_length); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(req); - nxt_unit_mmap_buf_release(b); - } nxt_queue_loop; + return NXT_UNIT_OK; +} + + +static int +nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + size_t hsize; + nxt_unit_impl_t *lib; + nxt_unit_mmap_buf_t *b; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_callbacks_t *cb; + nxt_unit_request_info_t *req; + nxt_unit_request_info_impl_t *req_impl; + nxt_unit_websocket_frame_impl_t *ws_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, + recv_msg->last); + if (req_impl == NULL) { + return NXT_UNIT_OK; } - if (recv_msg.process != NULL) { - nxt_unit_process_use(ctx, recv_msg.process, -1); + req = &req_impl->req; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + cb = &lib->callbacks; + + if (cb->websocket_handler && recv_msg->size >= 2) { + ws_impl = nxt_unit_websocket_frame_get(ctx); + if (nxt_slow_path(ws_impl == NULL)) { + nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", + req_impl->stream); + + return NXT_UNIT_ERROR; + } + + ws_impl->ws.req = req; + + ws_impl->buf = NULL; + ws_impl->retain_buf = NULL; + + if (recv_msg->mmap) { + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; + } + + /* "Move" incoming buffer list to ws_impl. */ + ws_impl->buf = recv_msg->incoming_buf; + ws_impl->buf->prev = &ws_impl->buf; + recv_msg->incoming_buf = NULL; + + b = ws_impl->buf; + + } else { + b = nxt_unit_mmap_buf_get(ctx); + if (nxt_slow_path(b == NULL)) { + return NXT_UNIT_ERROR; + } + + b->hdr = NULL; + b->req = req; + b->buf.start = recv_msg->start; + b->buf.free = b->buf.start; + b->buf.end = b->buf.start + recv_msg->size; + + nxt_unit_mmap_buf_insert(&ws_impl->buf, b); + } + + ws_impl->ws.header = (void *) b->buf.start; + ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( + ws_impl->ws.header); + + hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); + + if (ws_impl->ws.header->mask) { + ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; + + } else { + ws_impl->ws.mask = NULL; + } + + b->buf.free += hsize; + + ws_impl->ws.content_buf = &b->buf; + ws_impl->ws.content_length = ws_impl->ws.payload_len; + + nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " + "payload_len=%"PRIu64, + ws_impl->ws.header->opcode, + ws_impl->ws.payload_len); + + cb->websocket_handler(&ws_impl->ws); } - return rc; + if (recv_msg->last) { + req_impl->websocket = 0; + + if (cb->close_handler) { + nxt_unit_req_debug(req, "close_handler"); + + cb->close_handler(req); + + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + } + + return NXT_UNIT_OK; } @@ -815,9 +1059,7 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) static void nxt_unit_request_info_release(nxt_unit_request_info_t *req) { - nxt_unit_mmap_buf_t *b; nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_recv_msg_t *recv_msg; nxt_unit_request_info_impl_t *req_impl; ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); @@ -826,30 +1068,31 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - recv_msg = &req_impl->recv_msg; + if (req_impl->process != NULL) { + nxt_unit_process_use(req->ctx, req_impl->process, -1); - if (recv_msg->process != NULL) { - nxt_unit_process_use(req->ctx, recv_msg->process, -1); - - recv_msg->process = NULL; + req_impl->process = NULL; } - nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) { - - nxt_unit_buf_free(&b->buf); + if (req_impl->websocket) { + nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); - } nxt_queue_loop; - - nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) { + req_impl->websocket = 0; + } - nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start); - nxt_unit_mmap_buf_release(b); + while (req_impl->outgoing_buf != NULL) { + nxt_unit_mmap_buf_free(req_impl->outgoing_buf); + } - } nxt_queue_loop; + while (req_impl->incoming_buf != NULL) { + nxt_unit_mmap_buf_free(req_impl->incoming_buf); + } nxt_queue_remove(&req_impl->link); nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); + + req_impl->state = NXT_UNIT_RS_RELEASED; } @@ -868,6 +1111,68 @@ nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) } +static nxt_unit_websocket_frame_impl_t * +nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) +{ + nxt_queue_link_t *lnk; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_websocket_frame_impl_t *ws_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + if (nxt_queue_is_empty(&ctx_impl->free_ws)) { + ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); + if (nxt_slow_path(ws_impl == NULL)) { + nxt_unit_warn(ctx, "websocket frame allocation failed"); + + return NULL; + } + + } else { + lnk = nxt_queue_first(&ctx_impl->free_ws); + nxt_queue_remove(lnk); + + ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); + } + + ws_impl->ctx_impl = ctx_impl; + + return ws_impl; +} + + +static void +nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) +{ + nxt_unit_websocket_frame_impl_t *ws_impl; + + ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); + + while (ws_impl->buf != NULL) { + nxt_unit_mmap_buf_free(ws_impl->buf); + } + + ws->req = NULL; + + if (ws_impl->retain_buf != NULL) { + free(ws_impl->retain_buf); + + ws_impl->retain_buf = NULL; + } + + nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); +} + + +static void +nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) +{ + nxt_queue_remove(&ws_impl->link); + + free(ws_impl); +} + + uint16_t nxt_unit_field_hash(const char *name, size_t name_length) { @@ -1275,6 +1580,10 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) return NXT_UNIT_ERROR; } + if (req->request->websocket_handshake && req->response->status == 101) { + nxt_unit_response_upgrade(req); + } + nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", req->response->fields_count, (int) (req->response_buf->free @@ -1282,9 +1591,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); if (nxt_fast_path(rc == NXT_UNIT_OK)) { req->response = NULL; req->response_buf = NULL; @@ -1312,7 +1619,6 @@ nxt_unit_buf_t * nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) { int rc; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_request_info_impl_t *req_impl; @@ -1327,11 +1633,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); - if (nxt_slow_path(process == NULL)) { - return NULL; - } - mmap_buf = nxt_unit_mmap_buf_get(req->ctx); if (nxt_slow_path(mmap_buf == NULL)) { return NULL; @@ -1339,10 +1640,10 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) mmap_buf->req = req; - nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link); + nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); - rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, - size, mmap_buf); + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, size, mmap_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1366,13 +1667,13 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) pthread_mutex_lock(&lib->mutex); - recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0); + recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); pthread_mutex_unlock(&lib->mutex); if (recv_msg->process == NULL) { nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", - recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid); + recv_msg->stream, (int) recv_msg->pid); } return recv_msg->process; @@ -1382,23 +1683,21 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static nxt_unit_mmap_buf_t * nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) { - nxt_queue_link_t *lnk; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_ctx_impl_t *ctx_impl; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (nxt_queue_is_empty(&ctx_impl->free_buf)) { + if (ctx_impl->free_buf == NULL) { mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); if (nxt_slow_path(mmap_buf == NULL)) { nxt_unit_warn(ctx, "failed to allocate buf"); } } else { - lnk = nxt_queue_first(&ctx_impl->free_buf); - nxt_queue_remove(lnk); + mmap_buf = ctx_impl->free_buf; - mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); + nxt_unit_mmap_buf_remove(mmap_buf); } mmap_buf->ctx_impl = ctx_impl; @@ -1410,9 +1709,91 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) { - nxt_queue_remove(&mmap_buf->link); + nxt_unit_mmap_buf_remove(mmap_buf); + + nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); +} + + +typedef struct { + size_t len; + const char *str; +} nxt_unit_str_t; + + +#define nxt_unit_str(str) { nxt_length(str), str } + + +int +nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) +{ + return req->request->websocket_handshake; +} + + +int +nxt_unit_response_upgrade(nxt_unit_request_info_t *req) +{ + int rc; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + if (nxt_slow_path(req_impl->websocket != 0)) { + nxt_unit_req_debug(req, "upgrade: already upgraded"); + + return NXT_UNIT_OK; + } + + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); + + return NXT_UNIT_ERROR; + } + + if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { + nxt_unit_req_warn(req, "upgrade: response already sent"); + + return NXT_UNIT_ERROR; + } + + ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + + rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); + + return NXT_UNIT_ERROR; + } + + req_impl->websocket = 1; + + req->response->status = 101; + + return NXT_UNIT_OK; +} + + +int +nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) +{ + nxt_unit_request_info_impl_t *req_impl; - nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + return req_impl->websocket; +} + + +nxt_unit_request_info_t * +nxt_unit_get_request_info_from_data(void *data) +{ + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); + + return &req_impl->req; } @@ -1445,9 +1826,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -1472,10 +1851,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) req = mmap_buf->req; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - mmap_buf, 1); - + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1506,6 +1882,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); buf = &mmap_buf->buf; + hdr = mmap_buf->hdr; m.mmap_msg.size = buf->free - buf->start; @@ -1514,15 +1891,15 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.msg.reply_port = 0; m.msg.type = _NXT_PORT_MSG_DATA; m.msg.last = last != 0; - m.msg.mmap = m.mmap_msg.size > 0; + m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; m.msg.nf = 0; m.msg.mf = 0; m.msg.tracking = 0; - hdr = mmap_buf->hdr; - - m.mmap_msg.mmap_id = hdr->id; - m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); + if (hdr != NULL) { + m.mmap_msg.mmap_id = hdr->id; + m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); + } nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", stream, @@ -1531,14 +1908,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, (int) m.mmap_msg.size); res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, - m.mmap_msg.size > 0 ? sizeof(m) - : sizeof(m.msg), + m.msg.mmap ? sizeof(m) : sizeof(m.msg), NULL, 0); if (nxt_slow_path(res != sizeof(m))) { return NXT_UNIT_ERROR; } - if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { + if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { last_used = (u_char *) buf->free - 1; first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; @@ -1557,11 +1933,17 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, void nxt_unit_buf_free(nxt_unit_buf_t *buf) { - nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); +} - mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); - nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start); +static void +nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) +{ + if (nxt_fast_path(mmap_buf->hdr != NULL)) { + nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, + mmap_buf->buf.end - mmap_buf->buf.start); + } nxt_unit_mmap_buf_release(mmap_buf); } @@ -1570,26 +1952,15 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf) nxt_unit_buf_t * nxt_unit_buf_next(nxt_unit_buf_t *buf) { - nxt_queue_link_t *lnk; - nxt_unit_mmap_buf_t *mmap_buf; - nxt_unit_request_info_impl_t *req_impl; + nxt_unit_mmap_buf_t *mmap_buf; mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); - req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t, - req); - lnk = &mmap_buf->link; - - if (lnk == nxt_queue_last(&req_impl->incoming_buf) - || lnk == nxt_queue_last(&req_impl->outgoing_buf)) - { + if (mmap_buf->next == NULL) { return NULL; } - lnk = nxt_queue_next(lnk); - mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); - - return &mmap_buf->buf; + return &mmap_buf->next->buf; } @@ -1614,7 +1985,6 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, int rc; uint32_t part_size; const char *part_start; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t mmap_buf; nxt_unit_request_info_impl_t *req_impl; @@ -1641,16 +2011,12 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, part_start += part_size; } - process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); - if (nxt_slow_path(process == NULL)) { - return NXT_UNIT_ERROR; - } - while (size > 0) { part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, - part_size, &mmap_buf); + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, part_size, + &mmap_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -1658,9 +2024,7 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, mmap_buf.buf.end - mmap_buf.buf.start); @@ -1766,6 +2130,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) { + return nxt_unit_buf_read(&req->content_buf, &req->content_length, + dst, size); +} + + +static ssize_t +nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) +{ u_char *p; size_t rest, copy, read; nxt_unit_buf_t *buf; @@ -1773,7 +2145,7 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) p = dst; rest = size; - buf = req->content_buf; + buf = *b; while (buf != NULL) { copy = buf->end - buf->free; @@ -1795,11 +2167,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf = nxt_unit_buf_next(buf); } - req->content_buf = buf; + *b = buf; read = size - rest; - req->content_length -= read; + *len -= read; return read; } @@ -1852,7 +2224,7 @@ skip_response_send: lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); - msg.stream = req_impl->recv_msg.port_msg.stream; + msg.stream = req_impl->stream; msg.pid = lib->pid; msg.reply_port = 0; msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA @@ -1874,6 +2246,162 @@ skip_response_send: } +int +nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, + uint8_t last, const void *start, size_t size) +{ + const struct iovec iov = { (void *) start, size }; + + return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); +} + + +int +nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, + uint8_t last, const struct iovec *iov, int iovcnt) +{ + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_websocket_header_t *wh; + + payload_len = 0; + + for (i = 0; i < iovcnt; i++) { + payload_len += iov[i].iov_len; + } + + buf_size = 10 + payload_len; + + buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, + PORT_MMAP_DATA_SIZE)); + if (nxt_slow_path(buf == NULL)) { + nxt_unit_req_error(req, "Failed to allocate buf for content"); + + return NXT_UNIT_ERROR; + } + + buf->start[0] = 0; + buf->start[1] = 0; + + wh = (void *) buf->free; + + buf->free = nxt_websocket_frame_init(wh, payload_len); + wh->fin = last; + wh->opcode = opcode; + + for (i = 0; i < iovcnt; i++) { + b = iov[i].iov_base; + l = iov[i].iov_len; + + while (l > 0) { + copy = buf->end - buf->free; + copy = nxt_min(l, copy); + + buf->free = nxt_cpymem(buf->free, b, copy); + b += copy; + l -= copy; + + if (l > 0) { + buf_size -= buf->end - buf->start; + + rc = nxt_unit_buf_send(buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_error(req, "Failed to send content"); + + return NXT_UNIT_ERROR; + } + + buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, + PORT_MMAP_DATA_SIZE)); + if (nxt_slow_path(buf == NULL)) { + nxt_unit_req_error(req, + "Failed to allocate buf for content"); + + return NXT_UNIT_ERROR; + } + } + } + } + + if (buf->free > buf->start) { + rc = nxt_unit_buf_send(buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_error(req, "Failed to send content"); + } + } + + return rc; +} + + +ssize_t +nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, + size_t size) +{ + ssize_t res; + uint8_t *b; + uint64_t i, d; + + res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, + dst, size); + + if (ws->mask == NULL) { + return res; + } + + b = dst; + d = (ws->payload_len - ws->content_length - res) % 4; + + for (i = 0; i < (uint64_t) res; i++) { + b[i] ^= ws->mask[ (i + d) % 4 ]; + } + + return res; +} + + +int +nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) +{ + char *b; + size_t size; + nxt_unit_websocket_frame_impl_t *ws_impl; + + ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); + + if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { + return NXT_UNIT_OK; + } + + size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; + + b = malloc(size); + if (nxt_slow_path(b == NULL)) { + return NXT_UNIT_ERROR; + } + + memcpy(b, ws_impl->buf->buf.start, size); + + ws_impl->buf->buf.start = b; + ws_impl->buf->buf.free = b; + ws_impl->buf->buf.end = b + size; + + ws_impl->retain_buf = b; + + return NXT_UNIT_OK; +} + + +void +nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) +{ + nxt_unit_websocket_frame_release(ws); +} + + static nxt_port_mmap_header_t * nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) @@ -2355,7 +2883,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", - recv_msg->port_msg.stream, (int) recv_msg->size); + recv_msg->stream, (int) recv_msg->size); return 0; } @@ -2378,18 +2906,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " "invalid mmap id %d,%"PRIu32, - recv_msg->port_msg.stream, - (int) process->pid, tracking_msg->mmap_id); + recv_msg->stream, (int) process->pid, + tracking_msg->mmap_id); return 0; } c = tracking_msg->tracking_id; - rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0); + rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); if (rc == 0) { nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", - recv_msg->port_msg.stream); + recv_msg->stream); nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); } @@ -2401,19 +2929,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static int -nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, - nxt_queue_t *incoming_buf) +nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { void *start; uint32_t size; nxt_unit_process_t *process; - nxt_unit_mmap_buf_t *b; + nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; nxt_port_mmap_header_t *hdr; if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", - recv_msg->port_msg.stream, (int) recv_msg->size); + recv_msg->stream, (int) recv_msg->size); return NXT_UNIT_ERROR; } @@ -2426,6 +2953,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, mmap_msg = recv_msg->start; end = nxt_pointer_to(recv_msg->start, recv_msg->size); + incoming_tail = &recv_msg->incoming_buf; + pthread_mutex_lock(&process->incoming.mutex); for (; mmap_msg < end; mmap_msg++) { @@ -2435,8 +2964,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " "invalid mmap id %d,%"PRIu32, - recv_msg->port_msg.stream, - (int) process->pid, mmap_msg->mmap_id); + recv_msg->stream, (int) process->pid, + mmap_msg->mmap_id); return NXT_UNIT_ERROR; } @@ -2453,16 +2982,16 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, if (nxt_slow_path(b == NULL)) { pthread_mutex_unlock(&process->incoming.mutex); - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " - "failed to allocate buf", - recv_msg->port_msg.stream); + nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", + recv_msg->stream); nxt_unit_mmap_release(hdr, start, size); return NXT_UNIT_ERROR; } - nxt_queue_insert_tail(incoming_buf, &b->link); + nxt_unit_mmap_buf_insert(incoming_tail, b); + incoming_tail = &b->next; b->buf.start = start; b->buf.free = start; @@ -2470,7 +2999,7 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, b->hdr = hdr; nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", - recv_msg->port_msg.stream, + recv_msg->stream, start, (int) size, (int) hdr->src_pid, (int) hdr->dst_pid, (int) hdr->id, (int) mmap_msg->chunk_id, @@ -2685,6 +3214,11 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) if (nxt_fast_path(rsize > 0)) { rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, oob, sizeof(oob)); + +#if (NXT_DEBUG) + memset(buf, 0xAC, rsize); +#endif + } else { rc = NXT_UNIT_ERROR; } @@ -2775,10 +3309,11 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) void nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) { - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_mmap_buf_t *mmap_buf; - nxt_unit_request_info_impl_t *req_impl; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_request_info_impl_t *req_impl; + nxt_unit_websocket_frame_impl_t *ws_impl; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -2792,15 +3327,14 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; - nxt_queue_remove(&ctx_impl->ctx_buf[0].link); - nxt_queue_remove(&ctx_impl->ctx_buf[1].link); - - nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) { + nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); + nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); - nxt_queue_remove(&mmap_buf->link); + while (ctx_impl->free_buf != NULL) { + mmap_buf = ctx_impl->free_buf; + nxt_unit_mmap_buf_remove(mmap_buf); free(mmap_buf); - - } nxt_queue_loop; + } nxt_queue_each(req_impl, &ctx_impl->free_req, nxt_unit_request_info_impl_t, link) @@ -2809,6 +3343,13 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; + nxt_queue_each(ws_impl, &ctx_impl->free_ws, + nxt_unit_websocket_frame_impl_t, link) + { + nxt_unit_websocket_frame_free(ws_impl); + + } nxt_queue_loop; + nxt_queue_remove(&ctx_impl->link); if (ctx_impl != &lib->main_ctx) { @@ -3454,6 +3995,83 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, } +static nxt_int_t +nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + return NXT_OK; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_unit_request_hash_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +static int +nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, + nxt_unit_request_info_impl_t *req_impl) +{ + uint32_t *stream; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + + stream = &req_impl->stream; + + lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); + lhq.key.length = sizeof(*stream); + lhq.key.start = (u_char *) stream; + lhq.proto = &lvlhsh_requests_proto; + lhq.pool = NULL; + lhq.replace = 0; + lhq.value = req_impl; + + res = nxt_lvlhsh_insert(request_hash, &lhq); + + switch (res) { + + case NXT_OK: + return NXT_UNIT_OK; + + default: + return NXT_UNIT_ERROR; + } +} + + +static nxt_unit_request_info_impl_t * +nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, + int remove) +{ + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); + lhq.key.length = sizeof(stream); + lhq.key.start = (u_char *) &stream; + lhq.proto = &lvlhsh_requests_proto; + lhq.pool = NULL; + + if (remove) { + res = nxt_lvlhsh_delete(request_hash, &lhq); + + } else { + res = nxt_lvlhsh_find(request_hash, &lhq); + } + + switch (res) { + + case NXT_OK: + return lhq.value; + + default: + return NULL; + } +} + + void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) { @@ -3526,8 +4144,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) if (nxt_fast_path(req != NULL)) { req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - p += snprintf(p, end - p, - "#%"PRIu32": ", req_impl->recv_msg.port_msg.stream); + p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); } va_start(ap, fmt); diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 532de20d..3471a758 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -9,6 +9,7 @@ #include <inttypes.h> #include <sys/types.h> +#include <sys/uio.h> #include <string.h> #include "nxt_version.h" @@ -106,17 +107,24 @@ struct nxt_unit_request_info_s { void *data; }; + /* * Set of application-specific callbacks. Application may leave all optional * callbacks as NULL. */ struct nxt_unit_callbacks_s { /* - * Process request data. Unlike all other callback, this callback + * Process request. Unlike all other callback, this callback * need to be defined by application. */ void (*request_handler)(nxt_unit_request_info_t *req); + /* Process websocket frame. */ + void (*websocket_handler)(nxt_unit_websocket_frame_t *ws); + + /* Connection closed. */ + void (*close_handler)(nxt_unit_request_info_t *req); + /* Add new Unit port to communicate with process pid. Optional. */ int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); @@ -293,6 +301,14 @@ int nxt_unit_response_is_sent(nxt_unit_request_info_t *req); nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size); +int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req); + +int nxt_unit_response_upgrade(nxt_unit_request_info_t *req); + +int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req); + +nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data); + int nxt_unit_buf_send(nxt_unit_buf_t *buf); void nxt_unit_buf_free(nxt_unit_buf_t *buf); @@ -315,6 +331,20 @@ ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc); +int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, + uint8_t last, const void *start, size_t size); + +int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, + uint8_t last, const struct iovec *iov, int iovcnt); + +ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, + size_t size); + +int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws); + +void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws); + + void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...); void nxt_unit_req_log(nxt_unit_request_info_t *req, int level, diff --git a/src/nxt_unit_request.h b/src/nxt_unit_request.h index 2207cefa..52017a42 100644 --- a/src/nxt_unit_request.h +++ b/src/nxt_unit_request.h @@ -20,6 +20,7 @@ struct nxt_unit_request_s { uint8_t remote_length; uint8_t local_length; uint8_t tls; + uint8_t websocket_handshake; uint32_t server_name_length; uint32_t target_length; uint32_t path_length; diff --git a/src/nxt_unit_typedefs.h b/src/nxt_unit_typedefs.h index 871ce25b..26e54f91 100644 --- a/src/nxt_unit_typedefs.h +++ b/src/nxt_unit_typedefs.h @@ -7,19 +7,20 @@ #define _NXT_UNIT_TYPEDEFS_H_INCLUDED_ -typedef struct nxt_unit_s nxt_unit_t; -typedef struct nxt_unit_ctx_s nxt_unit_ctx_t; -typedef struct nxt_unit_port_id_s nxt_unit_port_id_t; -typedef struct nxt_unit_port_s nxt_unit_port_t; -typedef struct nxt_unit_buf_s nxt_unit_buf_t; -typedef struct nxt_unit_request_info_s nxt_unit_request_info_t; -typedef struct nxt_unit_callbacks_s nxt_unit_callbacks_t; -typedef struct nxt_unit_init_s nxt_unit_init_t; -typedef union nxt_unit_sptr_u nxt_unit_sptr_t; -typedef struct nxt_unit_field_s nxt_unit_field_t; -typedef struct nxt_unit_request_s nxt_unit_request_t; -typedef struct nxt_unit_response_s nxt_unit_response_t; -typedef struct nxt_unit_read_info_s nxt_unit_read_info_t; +typedef struct nxt_unit_s nxt_unit_t; +typedef struct nxt_unit_ctx_s nxt_unit_ctx_t; +typedef struct nxt_unit_port_id_s nxt_unit_port_id_t; +typedef struct nxt_unit_port_s nxt_unit_port_t; +typedef struct nxt_unit_buf_s nxt_unit_buf_t; +typedef struct nxt_unit_request_info_s nxt_unit_request_info_t; +typedef struct nxt_unit_callbacks_s nxt_unit_callbacks_t; +typedef struct nxt_unit_init_s nxt_unit_init_t; +typedef union nxt_unit_sptr_u nxt_unit_sptr_t; +typedef struct nxt_unit_field_s nxt_unit_field_t; +typedef struct nxt_unit_request_s nxt_unit_request_t; +typedef struct nxt_unit_response_s nxt_unit_response_t; +typedef struct nxt_unit_read_info_s nxt_unit_read_info_t; +typedef struct nxt_unit_websocket_frame_s nxt_unit_websocket_frame_t; #endif /* _NXT_UNIT_TYPEDEFS_H_INCLUDED_ */ diff --git a/src/nxt_unit_websocket.h b/src/nxt_unit_websocket.h new file mode 100644 index 00000000..beb2536e --- /dev/null +++ b/src/nxt_unit_websocket.h @@ -0,0 +1,27 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_UNIT_WEBSOCKET_H_INCLUDED_ +#define _NXT_UNIT_WEBSOCKET_H_INCLUDED_ + +#include <inttypes.h> + +#include "nxt_unit_typedefs.h" +#include "nxt_websocket_header.h" + + +struct nxt_unit_websocket_frame_s { + nxt_unit_request_info_t *req; + + uint64_t payload_len; + nxt_websocket_header_t *header; + uint8_t *mask; + + nxt_unit_buf_t *content_buf; + uint64_t content_length; +}; + + +#endif /* _NXT_UNIT_WEBSOCKET_H_INCLUDED_ */ diff --git a/src/nxt_websocket.c b/src/nxt_websocket.c new file mode 100644 index 00000000..9a099760 --- /dev/null +++ b/src/nxt_websocket.c @@ -0,0 +1,122 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_websocket.h> +#include <nxt_websocket_header.h> + + +nxt_inline uint16_t +nxt_ntoh16(const uint8_t *b) +{ + return ((uint16_t) b[0]) << 8 | ((uint16_t) b[1]); +} + + +nxt_inline void +nxt_hton16(uint8_t *b, uint16_t v) +{ + b[0] = (v >> 8); + b[1] = (v & 0xFFu); +} + + +nxt_inline uint64_t +nxt_ntoh64(const uint8_t *b) +{ + return ((uint64_t) b[0]) << 56 + | ((uint64_t) b[1]) << 48 + | ((uint64_t) b[2]) << 40 + | ((uint64_t) b[3]) << 32 + | ((uint64_t) b[4]) << 24 + | ((uint64_t) b[5]) << 16 + | ((uint64_t) b[6]) << 8 + | ((uint64_t) b[7]); +} + + +nxt_inline void +nxt_hton64(uint8_t *b, uint64_t v) +{ + b[0] = (v >> 56); + b[1] = (v >> 48) & 0xFFu; + b[2] = (v >> 40) & 0xFFu; + b[3] = (v >> 32) & 0xFFu; + b[4] = (v >> 24) & 0xFFu; + b[5] = (v >> 16) & 0xFFu; + b[6] = (v >> 8) & 0xFFu; + b[7] = v & 0xFFu; +} + + +size_t +nxt_websocket_frame_header_size(const void *data) +{ + size_t res; + uint64_t p; + const nxt_websocket_header_t *h; + + h = data; + p = h->payload_len; + + res = 2; + + if (p == 126) { + res += 2; + } else if (p == 127) { + res += 8; + } + + if (h->mask) { + res += 4; + } + + return res; +} + + +uint64_t +nxt_websocket_frame_payload_len(const void *data) +{ + uint64_t p; + const nxt_websocket_header_t *h; + + h = data; + p = h->payload_len; + + if (p == 126) { + p = nxt_ntoh16(h->payload_len_); + } else if (p == 127) { + p = nxt_ntoh64(h->payload_len_); + } + + return p; +} + + +void * +nxt_websocket_frame_init(void *data, uint64_t payload_len) +{ + uint8_t *p; + nxt_websocket_header_t *h; + + h = data; + p = data; + + if (payload_len < 126) { + h->payload_len = payload_len; + return p + 2; + } + + if (payload_len < 65536) { + h->payload_len = 126; + nxt_hton16(h->payload_len_, payload_len); + return p + 4; + } + + h->payload_len = 127; + nxt_hton64(h->payload_len_, payload_len); + return p + 10; +} diff --git a/src/nxt_websocket.h b/src/nxt_websocket.h new file mode 100644 index 00000000..499a3268 --- /dev/null +++ b/src/nxt_websocket.h @@ -0,0 +1,21 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_WEBSOCKET_H_INCLUDED_ +#define _NXT_WEBSOCKET_H_INCLUDED_ + + +enum { + NXT_WEBSOCKET_ACCEPT_SIZE = 28, +}; + + +NXT_EXPORT size_t nxt_websocket_frame_header_size(const void *data); +NXT_EXPORT uint64_t nxt_websocket_frame_payload_len(const void *data); +NXT_EXPORT void *nxt_websocket_frame_init(void *data, uint64_t payload_len); +NXT_EXPORT void nxt_websocket_accept(u_char *accept, const void *key); + + +#endif /* _NXT_WEBSOCKET_H_INCLUDED_ */ diff --git a/src/nxt_websocket_accept.c b/src/nxt_websocket_accept.c new file mode 100644 index 00000000..05cbcb56 --- /dev/null +++ b/src/nxt_websocket_accept.c @@ -0,0 +1,68 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_websocket.h> +#include <nxt_sha1.h> + + +static void +nxt_websocket_base64_encode(u_char *d, const uint8_t *s, size_t len) +{ + u_char c0, c1, c2; + static u_char basis[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + while (len > 2) { + c0 = s[0]; + c1 = s[1]; + c2 = s[2]; + + *d++ = basis[c0 >> 2]; + *d++ = basis[((c0 & 0x03) << 4) | (c1 >> 4)]; + *d++ = basis[((c1 & 0x0f) << 2) | (c2 >> 6)]; + *d++ = basis[c2 & 0x3f]; + + s += 3; + len -= 3; + } + + if (len > 0) { + c0 = s[0]; + *d++ = basis[c0 >> 2]; + + if (len == 1) { + *d++ = basis[(c0 & 0x03) << 4]; + *d++ = '='; + *d++ = '='; + + } else { + c1 = s[1]; + + *d++ = basis[((c0 & 0x03) << 4) | (c1 >> 4)]; + *d++ = basis[(c1 & 0x0f) << 2]; + + *d++ = '='; + } + } +} + + +void +nxt_websocket_accept(u_char *accept, const void *key) +{ + u_char bin_accept[20]; + nxt_sha1_t ctx; + static const char accept_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + nxt_sha1_init(&ctx); + nxt_sha1_update(&ctx, key, 24); + nxt_sha1_update(&ctx, accept_guid, nxt_length(accept_guid)); + nxt_sha1_final(bin_accept, &ctx); + + nxt_websocket_base64_encode(accept, bin_accept, sizeof(bin_accept)); +} + + diff --git a/src/nxt_websocket_header.h b/src/nxt_websocket_header.h new file mode 100644 index 00000000..f75dfacd --- /dev/null +++ b/src/nxt_websocket_header.h @@ -0,0 +1,68 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_WEBSOCKET_HEADER_H_INCLUDED_ +#define _NXT_WEBSOCKET_HEADER_H_INCLUDED_ + +#include <netinet/in.h> + + +typedef struct { +#if (BYTE_ORDER == BIG_ENDIAN) + uint8_t fin:1; + uint8_t rsv1:1; + uint8_t rsv2:1; + uint8_t rsv3:1; + uint8_t opcode:4; + + uint8_t mask:1; + uint8_t payload_len:7; +#endif + +#if (BYTE_ORDER == LITTLE_ENDIAN) + uint8_t opcode:4; + uint8_t rsv3:1; + uint8_t rsv2:1; + uint8_t rsv1:1; + uint8_t fin:1; + + uint8_t payload_len:7; + uint8_t mask:1; +#endif + + uint8_t payload_len_[8]; +} nxt_websocket_header_t; + + +enum { + NXT_WEBSOCKET_OP_CONT = 0x00, + NXT_WEBSOCKET_OP_TEXT = 0x01, + NXT_WEBSOCKET_OP_BINARY = 0x02, + NXT_WEBSOCKET_OP_CLOSE = 0x08, + NXT_WEBSOCKET_OP_PING = 0x09, + NXT_WEBSOCKET_OP_PONG = 0x0A, + + NXT_WEBSOCKET_OP_CTRL = 0x08, +}; + + +enum { + NXT_WEBSOCKET_CR_NORMAL = 1000, + NXT_WEBSOCKET_CR_GOING_AWAY = 1001, + NXT_WEBSOCKET_CR_PROTOCOL_ERROR = 1002, + NXT_WEBSOCKET_CR_UNPROCESSABLE_INPUT = 1003, + NXT_WEBSOCKET_CR_RESERVED = 1004, + NXT_WEBSOCKET_CR_NOT_PROVIDED = 1005, + NXT_WEBSOCKET_CR_ABNORMAL = 1006, + NXT_WEBSOCKET_CR_INVALID_DATA = 1007, + NXT_WEBSOCKET_CR_POLICY_VIOLATION = 1008, + NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG = 1009, + NXT_WEBSOCKET_CR_EXTENSION_REQUIRED = 1010, + NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR = 1011, + NXT_WEBSOCKET_CR_TLS_HANDSHAKE_FAILED = 1015, +}; + + +#endif /* _NXT_WEBSOCKET_HEADER_H_INCLUDED_ */ diff --git a/src/test/nxt_unit_websocket_chat.c b/src/test/nxt_unit_websocket_chat.c new file mode 100644 index 00000000..ecc9a243 --- /dev/null +++ b/src/test/nxt_unit_websocket_chat.c @@ -0,0 +1,348 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <fcntl.h> +#include <stdio.h> +#include <string.h> +#include <errno.h> + +#include <sys/mman.h> +#include <sys/stat.h> + +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_clang.h> +#include <nxt_websocket.h> +#include <nxt_unit_websocket.h> +#include <nxt_main.h> + + +#define CONTENT_TYPE "Content-Type" +#define CONTENT_LENGTH "Content-Length" +#define TEXT_HTML "text/html" + +typedef struct { + nxt_queue_link_t link; + int id; +} ws_chat_request_data_t; + + +static int ws_chat_root(nxt_unit_request_info_t *req); +static void ws_chat_broadcast(const void *buf, size_t size); + + +static const char ws_chat_index_html[]; +static const int ws_chat_index_html_size; + +static char ws_chat_index_content_length[34]; +static int ws_chat_index_content_length_size; + +static nxt_queue_t ws_chat_sessions; +static int ws_chat_next_id = 0; + + +static void +ws_chat_request_handler(nxt_unit_request_info_t *req) +{ + static char buf[1024]; + int buf_size; + int rc = NXT_UNIT_OK; + nxt_unit_request_t *r; + ws_chat_request_data_t *data; + + r = req->request; + + const char* target = nxt_unit_sptr_get(&r->target); + + if (strcmp(target, "/") == 0) { + rc = ws_chat_root(req); + goto fail; + } + + if (strcmp(target, "/chat") == 0) { + if (!nxt_unit_request_is_websocket_handshake(req)) { + goto notfound; + } + + rc = nxt_unit_response_init(req, 101, 0, 0); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + data = req->data; + nxt_queue_insert_tail(&ws_chat_sessions, &data->link); + + data->id = ws_chat_next_id++; + + nxt_unit_response_upgrade(req); + nxt_unit_response_send(req); + + + buf_size = snprintf(buf, sizeof(buf), "Guest #%d has joined.", data->id); + + ws_chat_broadcast(buf, buf_size); + + return; + } + +notfound: + + rc = nxt_unit_response_init(req, 404, 0, 0); + +fail: + + nxt_unit_request_done(req, rc); +} + + +static int +ws_chat_root(nxt_unit_request_info_t *req) +{ + int rc; + + rc = nxt_unit_response_init(req, 200 /* Status code. */, + 2 /* Number of response headers. */, + nxt_length(CONTENT_TYPE) + 1 + + nxt_length(TEXT_HTML) + 1 + + nxt_length(CONTENT_LENGTH) + 1 + + ws_chat_index_content_length_size + 1 + + ws_chat_index_html_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; + } + + rc = nxt_unit_response_add_field(req, + CONTENT_TYPE, nxt_length(CONTENT_TYPE), + TEXT_HTML, nxt_length(TEXT_HTML)); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; + } + + rc = nxt_unit_response_add_field(req, + CONTENT_LENGTH, nxt_length(CONTENT_LENGTH), + ws_chat_index_content_length, + ws_chat_index_content_length_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; + } + + rc = nxt_unit_response_add_content(req, ws_chat_index_html, + ws_chat_index_html_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; + } + + return nxt_unit_response_send(req); +} + + +static void +ws_chat_broadcast(const void *buf, size_t size) +{ + ws_chat_request_data_t *data; + nxt_unit_request_info_t *req; + + nxt_unit_debug(NULL, "broadcast: %s", buf); + + nxt_queue_each(data, &ws_chat_sessions, ws_chat_request_data_t, link) { + + req = nxt_unit_get_request_info_from_data(data); + + nxt_unit_req_debug(req, "broadcast: %s", buf); + + nxt_unit_websocket_send(req, NXT_WEBSOCKET_OP_TEXT, 1, buf, size); + } nxt_queue_loop; +} + + +static void +ws_chat_websocket_handler(nxt_unit_websocket_frame_t *ws) +{ + int buf_size; + static char buf[1024]; + ws_chat_request_data_t *data; + + if (ws->header->opcode != NXT_WEBSOCKET_OP_TEXT) { + return; + } + + data = ws->req->data; + + buf_size = snprintf(buf, sizeof(buf), "Guest #%d: ", data->id); + + buf_size += nxt_unit_websocket_read(ws, buf + buf_size, + nxt_min(sizeof(buf), + ws->content_length)); + + ws_chat_broadcast(buf, buf_size); + + nxt_unit_websocket_done(ws); +} + + +static void +ws_chat_close_handler(nxt_unit_request_info_t *req) +{ + int buf_size; + static char buf[1024]; + ws_chat_request_data_t *data; + + data = req->data; + buf_size = snprintf(buf, sizeof(buf), "Guest #%d has disconnected.", + data->id); + + nxt_queue_remove(&data->link); + nxt_unit_request_done(req, NXT_UNIT_OK); + + ws_chat_broadcast(buf, buf_size); +} + + +int +main() +{ + nxt_unit_ctx_t *ctx; + nxt_unit_init_t init; + + ws_chat_index_content_length_size = + snprintf(ws_chat_index_content_length, + sizeof(ws_chat_index_content_length), "%d", + ws_chat_index_html_size); + + nxt_queue_init(&ws_chat_sessions); + + memset(&init, 0, sizeof(nxt_unit_init_t)); + + init.callbacks.request_handler = ws_chat_request_handler; + init.callbacks.websocket_handler = ws_chat_websocket_handler; + init.callbacks.close_handler = ws_chat_close_handler; + + init.request_data_size = sizeof(ws_chat_request_data_t); + + ctx = nxt_unit_init(&init); + if (ctx == NULL) { + return 1; + } + + nxt_unit_run(ctx); + + nxt_unit_done(ctx); + + return 0; +} + + +static const char ws_chat_index_html[] = +"<html>\n" +"<head>\n" +" <title>WebSocket Chat Examples</title>\n" +" <style type=\"text/css\">\n" +" input#chat {\n" +" width: 410px\n" +" }\n" +"\n" +" #container {\n" +" width: 400px;\n" +" }\n" +"\n" +" #console {\n" +" border: 1px solid #CCCCCC;\n" +" border-right-color: #999999;\n" +" border-bottom-color: #999999;\n" +" height: 170px;\n" +" overflow-y: scroll;\n" +" padding: 5px;\n" +" width: 100%;\n" +" }\n" +"\n" +" #console p {\n" +" padding: 0;\n" +" margin: 0;\n" +" }\n" +" </style>\n" +" <script>\n" +" \"use strict\";\n" +"\n" +" var Chat = {};\n" +"\n" +" Chat.socket = null;\n" +"\n" +" Chat.connect = (function(host) {\n" +" if ('WebSocket' in window) {\n" +" Chat.socket = new WebSocket(host);\n" +" } else if ('MozWebSocket' in window) {\n" +" Chat.socket = new MozWebSocket(host);\n" +" } else {\n" +" Console.log('Error: WebSocket is not supported by this browser.');\n" +" return;\n" +" }\n" +"\n" +" Chat.socket.onopen = function () {\n" +" Console.log('Info: WebSocket connection opened.');\n" +" document.getElementById('chat').onkeydown = function(event) {\n" +" if (event.keyCode == 13) {\n" +" Chat.sendMessage();\n" +" }\n" +" };\n" +" };\n" +"\n" +" Chat.socket.onclose = function () {\n" +" document.getElementById('chat').onkeydown = null;\n" +" Console.log('Info: WebSocket closed.');\n" +" };\n" +"\n" +" Chat.socket.onmessage = function (message) {\n" +" Console.log(message.data);\n" +" };\n" +" });\n" +"\n" +" Chat.initialize = function() {\n" +" var proto = 'ws://';\n" +" if (window.location.protocol == 'https:') {\n" +" proto = 'wss://'\n" +" }\n" +" Chat.connect(proto + window.location.host + '/chat');\n" +" };\n" +"\n" +" Chat.sendMessage = (function() {\n" +" var message = document.getElementById('chat').value;\n" +" if (message != '') {\n" +" Chat.socket.send(message);\n" +" document.getElementById('chat').value = '';\n" +" }\n" +" });\n" +"\n" +" var Console = {};\n" +"\n" +" Console.log = (function(message) {\n" +" var console = document.getElementById('console');\n" +" var p = document.createElement('p');\n" +" p.style.wordWrap = 'break-word';\n" +" p.innerHTML = message;\n" +" console.appendChild(p);\n" +" while (console.childNodes.length > 25) {\n" +" console.removeChild(console.firstChild);\n" +" }\n" +" console.scrollTop = console.scrollHeight;\n" +" });\n" +"\n" +" Chat.initialize();\n" +"\n" +" </script>\n" +"</head>\n" +"<body>\n" +"<noscript><h2 style=\"color: #ff0000\">Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable\n" +" Javascript and reload this page!</h2></noscript>\n" +"<div>\n" +" <p><input type=\"text\" placeholder=\"type and press enter to chat\" id=\"chat\" /></p>\n" +" <div id=\"container\">\n" +" <div id=\"console\"/>\n" +" </div>\n" +"</div>\n" +"</body>\n" +"</html>\n" +; + +static const int ws_chat_index_html_size = nxt_length(ws_chat_index_html); diff --git a/src/test/nxt_unit_websocket_echo.c b/src/test/nxt_unit_websocket_echo.c new file mode 100644 index 00000000..2a89cdc0 --- /dev/null +++ b/src/test/nxt_unit_websocket_echo.c @@ -0,0 +1,105 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <string.h> +#include <stdlib.h> + +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_clang.h> +#include <nxt_websocket.h> +#include <nxt_unit_websocket.h> + + +static void +ws_echo_request_handler(nxt_unit_request_info_t *req) +{ + int rc; + const char *target; + + rc = NXT_UNIT_OK; + target = nxt_unit_sptr_get(&req->request->target); + + if (strcmp(target, "/") == 0) { + if (!nxt_unit_request_is_websocket_handshake(req)) { + goto notfound; + } + + rc = nxt_unit_response_init(req, 101, 0, 0); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + nxt_unit_response_upgrade(req); + nxt_unit_response_send(req); + + return; + } + +notfound: + + rc = nxt_unit_response_init(req, 404, 0, 0); + +fail: + + nxt_unit_request_done(req, rc); +} + + +static void +ws_echo_websocket_handler(nxt_unit_websocket_frame_t *ws) +{ + uint8_t opcode; + ssize_t size; + nxt_unit_request_info_t *req; + + static size_t buf_size = 0; + static uint8_t *buf = NULL; + + if (buf_size < ws->content_length) { + buf = realloc(buf, ws->content_length); + buf_size = ws->content_length; + } + + req = ws->req; + opcode = ws->header->opcode; + + if (opcode == NXT_WEBSOCKET_OP_PONG) { + nxt_unit_websocket_done(ws); + return; + } + + size = nxt_unit_websocket_read(ws, buf, ws->content_length); + + nxt_unit_websocket_send(req, opcode, ws->header->fin, buf, size); + nxt_unit_websocket_done(ws); + + if (opcode == NXT_WEBSOCKET_OP_CLOSE) { + nxt_unit_request_done(req, NXT_UNIT_OK); + } +} + + +int +main() +{ + nxt_unit_ctx_t *ctx; + nxt_unit_init_t init; + + memset(&init, 0, sizeof(nxt_unit_init_t)); + + init.callbacks.request_handler = ws_echo_request_handler; + init.callbacks.websocket_handler = ws_echo_websocket_handler; + + ctx = nxt_unit_init(&init); + if (ctx == NULL) { + return 1; + } + + nxt_unit_run(ctx); + nxt_unit_done(ctx); + + return 0; +} |