summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2019-08-20 16:31:53 +0300
committerMax Romanov <max.romanov@nginx.com>2019-08-20 16:31:53 +0300
commite501c74ddceab86e48c031ca9b5e154f52dcdae0 (patch)
tree7bfe94354df516d1ceefc5af3194ba943e443aa2 /src
parent9bbf54e23e185e94054072fff2673f6f5cd203e9 (diff)
downloadunit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.gz
unit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.bz2
Introducing websocket support in router and libunit.
Diffstat (limited to 'src')
-rw-r--r--src/nxt_conf_validation.c25
-rw-r--r--src/nxt_h1proto.c370
-rw-r--r--src/nxt_h1proto.h48
-rw-r--r--src/nxt_h1proto_websocket.c719
-rw-r--r--src/nxt_http.h28
-rw-r--r--src/nxt_http_request.c21
-rw-r--r--src/nxt_http_response.c2
-rw-r--r--src/nxt_http_websocket.c161
-rw-r--r--src/nxt_port.c1
-rw-r--r--src/nxt_port.h15
-rw-r--r--src/nxt_router.c181
-rw-r--r--src/nxt_router.h9
-rw-r--r--src/nxt_router_request.h71
-rw-r--r--src/nxt_sha1.c295
-rw-r--r--src/nxt_sha1.h24
-rw-r--r--src/nxt_unit.c1159
-rw-r--r--src/nxt_unit.h32
-rw-r--r--src/nxt_unit_request.h1
-rw-r--r--src/nxt_unit_typedefs.h27
-rw-r--r--src/nxt_unit_websocket.h27
-rw-r--r--src/nxt_websocket.c122
-rw-r--r--src/nxt_websocket.h21
-rw-r--r--src/nxt_websocket_accept.c68
-rw-r--r--src/nxt_websocket_header.h68
-rw-r--r--src/test/nxt_unit_websocket_chat.c348
-rw-r--r--src/test/nxt_unit_websocket_echo.c105
26 files changed, 3508 insertions, 440 deletions
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index 45c0eb41..ca8ec62e 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -102,6 +102,26 @@ static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
+static nxt_conf_vldt_object_t nxt_conf_vldt_websocket_members[] = {
+ { nxt_string("read_timeout"),
+ NXT_CONF_VLDT_INTEGER,
+ NULL,
+ NULL },
+
+ { nxt_string("keepalive_interval"),
+ NXT_CONF_VLDT_INTEGER,
+ NULL,
+ NULL },
+
+ { nxt_string("max_frame_size"),
+ NXT_CONF_VLDT_INTEGER,
+ NULL,
+ NULL },
+
+ NXT_CONF_VLDT_END
+};
+
+
static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[] = {
{ nxt_string("header_read_timeout"),
NXT_CONF_VLDT_INTEGER,
@@ -128,6 +148,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[] = {
NULL,
NULL },
+ { nxt_string("websocket"),
+ NXT_CONF_VLDT_OBJECT,
+ &nxt_conf_vldt_object,
+ (void *) &nxt_conf_vldt_websocket_members },
+
NXT_CONF_VLDT_END
};
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index a92dfe3b..a60bdb36 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -6,26 +6,9 @@
#include <nxt_router.h>
#include <nxt_http.h>
-
-
-struct nxt_h1proto_s {
- nxt_http_request_parse_t parser;
-
- uint8_t nbuffers;
- uint8_t keepalive; /* 1 bit */
- uint8_t chunked; /* 1 bit */
- nxt_http_te_t transfer_encoding:8; /* 2 bits */
-
- uint32_t header_size;
-
- nxt_http_request_t *request;
- nxt_buf_t *buffers;
- /*
- * All fields before the conn field will
- * be zeroed in a keep-alive connection.
- */
- nxt_conn_t *conn;
-};
+#include <nxt_h1proto.h>
+#include <nxt_websocket.h>
+#include <nxt_websocket_header.h>
/*
@@ -43,12 +26,18 @@ static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
void *data);
-static nxt_int_t nxt_h1p_header_process(nxt_h1proto_t *h1p,
+static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
nxt_http_request_t *r);
static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task,
nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf);
static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
uintptr_t data);
+static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
uintptr_t data);
static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
@@ -70,8 +59,6 @@ static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj,
void *data);
static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj,
void *data);
-static nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c,
- uintptr_t data);
nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
nxt_http_request_t *r);
static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
@@ -91,16 +78,16 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
uintptr_t data);
static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
-
#if (NXT_TLS)
static const nxt_conn_state_t nxt_http_idle_state;
static const nxt_conn_state_t nxt_h1p_shutdown_state;
#endif
static const nxt_conn_state_t nxt_h1p_idle_state;
-static const nxt_conn_state_t nxt_h1p_idle_close_state;
static const nxt_conn_state_t nxt_h1p_header_parse_state;
static const nxt_conn_state_t nxt_h1p_read_body_state;
static const nxt_conn_state_t nxt_h1p_request_send_state;
@@ -119,6 +106,7 @@ const nxt_http_proto_table_t nxt_http_proto[3] = {
.body_bytes_sent = nxt_h1p_request_body_bytes_sent,
.discard = nxt_h1p_request_discard,
.close = nxt_h1p_request_close,
+ .ws_frame_start = nxt_h1p_websocket_frame_start,
},
/* NXT_HTTP_PROTO_H2 */
/* NXT_HTTP_PROTO_DEVNULL */
@@ -129,6 +117,10 @@ static nxt_lvlhsh_t nxt_h1p_fields_hash;
static nxt_http_field_proc_t nxt_h1p_fields[] = {
{ nxt_string("Connection"), &nxt_h1p_connection, 0 },
+ { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 },
+ { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
+ { nxt_string("Sec-WebSocket-Version"),
+ &nxt_h1p_websocket_version, 0 },
{ nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
{ nxt_string("Host"), &nxt_http_request_host, 0 },
@@ -503,7 +495,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data)
*/
h1p->keepalive = (h1p->parser.version.s.minor != '0');
- ret = nxt_h1p_header_process(h1p, r);
+ ret = nxt_h1p_header_process(task, h1p, r);
if (nxt_fast_path(ret == NXT_OK)) {
@@ -551,7 +543,7 @@ nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data)
break;
}
- (void) nxt_h1p_header_process(h1p, r);
+ (void) nxt_h1p_header_process(task, h1p, r);
error:
@@ -562,8 +554,12 @@ error:
static nxt_int_t
-nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r)
+nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
+ nxt_http_request_t *r)
{
+ u_char *m;
+ nxt_int_t ret;
+
r->target.start = h1p->parser.target_start;
r->target.length = h1p->parser.target_end - h1p->parser.target_start;
@@ -578,7 +574,46 @@ nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r)
r->fields = h1p->parser.fields;
- return nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r);
+ ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ if (h1p->connection_upgrade && h1p->upgrade_websocket) {
+ m = h1p->parser.method.start;
+
+ if (nxt_slow_path(h1p->parser.method.length != 3
+ || m[0] != 'G'
+ || m[1] != 'E'
+ || m[2] != 'T'))
+ {
+ nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method");
+
+ return NXT_HTTP_BAD_REQUEST;
+ }
+
+ if (nxt_slow_path(h1p->parser.version.s.minor != '1')) {
+ nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version");
+
+ return NXT_HTTP_BAD_REQUEST;
+ }
+
+ if (nxt_slow_path(h1p->websocket_key == NULL)) {
+ nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key");
+
+ return NXT_HTTP_BAD_REQUEST;
+ }
+
+ if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
+ nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version");
+
+ return NXT_HTTP_UPGRADE_REQUIRED;
+ }
+
+ r->websocket_handshake = 1;
+ }
+
+ return ret;
}
@@ -620,12 +655,68 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
static nxt_int_t
nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
{
- nxt_http_request_t *r;
+ nxt_http_request_t *r;
+ static const u_char *upgrade = (const u_char *) "upgrade";
r = ctx;
if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
r->proto.h1->keepalive = 0;
+
+ } else if (field->value_length == 7
+ && nxt_memcasecmp(field->value, upgrade, 7) == 0)
+ {
+ r->proto.h1->connection_upgrade = 1;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
+{
+ nxt_http_request_t *r;
+ static const u_char *websocket = (const u_char *) "websocket";
+
+ r = ctx;
+
+ if (field->value_length == 9
+ && nxt_memcasecmp(field->value, websocket, 9) == 0)
+ {
+ r->proto.h1->upgrade_websocket = 1;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data)
+{
+ nxt_http_request_t *r;
+
+ r = ctx;
+
+ if (field->value_length == 24) {
+ r->proto.h1->websocket_key = field;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data)
+{
+ nxt_http_request_t *r;
+
+ r = ctx;
+
+ if (field->value_length == 2
+ && field->value[0] == '1' && field->value[1] == '3')
+ {
+ r->proto.h1->websocket_version_ok = 1;
}
return NXT_OK;
@@ -727,6 +818,7 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
if (size != 0) {
in->next = h1p->buffers;
h1p->buffers = in;
+ h1p->nbuffers++;
c = h1p->conn;
c->read = b;
@@ -805,6 +897,15 @@ nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r)
}
+#define NXT_HTTP_LAST_INFORMATIONAL \
+ (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1)
+
+static const nxt_str_t nxt_http_informational[] = {
+ nxt_string("HTTP/1.1 100 Continue\r\n"),
+ nxt_string("HTTP/1.1 101 Switching Protocols\r\n"),
+};
+
+
#define NXT_HTTP_LAST_SUCCESS \
(NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1)
@@ -861,7 +962,7 @@ static const nxt_str_t nxt_http_client_error[] = {
nxt_string("HTTP/1.1 423\r\n"),
nxt_string("HTTP/1.1 424\r\n"),
nxt_string("HTTP/1.1 425\r\n"),
- nxt_string("HTTP/1.1 426\r\n"),
+ nxt_string("HTTP/1.1 426 Upgrade Required\r\n"),
nxt_string("HTTP/1.1 427\r\n"),
nxt_string("HTTP/1.1 428\r\n"),
nxt_string("HTTP/1.1 429\r\n"),
@@ -911,10 +1012,14 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
u_char buf[UNKNOWN_STATUS_LENGTH];
static const char chunked[] = "Transfer-Encoding: chunked\r\n";
+ static const char websocket_version[] = "Sec-WebSocket-Version: 13\r\n";
- static const nxt_str_t connection[2] = {
+ static const nxt_str_t connection[3] = {
nxt_string("Connection: close\r\n"),
nxt_string("Connection: keep-alive\r\n"),
+ nxt_string("Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Accept: "),
};
nxt_debug(task, "h1p request header send");
@@ -923,7 +1028,10 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
h1p = r->proto.h1;
n = r->status;
- if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) {
+ if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) {
+ status = &nxt_http_informational[n - NXT_HTTP_CONTINUE];
+
+ } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) {
status = &nxt_http_success[n - NXT_HTTP_OK];
} else if (n >= NXT_HTTP_MULTIPLE_CHOICES
@@ -955,27 +1063,41 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
/* Trailing CRLF at the end of header. */
size += nxt_length("\r\n");
- http11 = (h1p->parser.version.s.minor != '0');
+ conn = -1;
+
+ if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) {
+ h1p->websocket = 1;
+ h1p->keepalive = 0;
+ conn = 2;
+ size += NXT_WEBSOCKET_ACCEPT_SIZE + 2;
+
+ } else {
+ http11 = (h1p->parser.version.s.minor != '0');
+
+ if (r->resp.content_length == NULL || r->resp.content_length->skip) {
- if (r->resp.content_length == NULL || r->resp.content_length->skip) {
+ if (http11) {
+ if (n != NXT_HTTP_NOT_MODIFIED
+ && n != NXT_HTTP_NO_CONTENT
+ && !h1p->websocket)
+ {
+ h1p->chunked = 1;
+ size += nxt_length(chunked);
+ /* Trailing CRLF will be added by the first chunk header. */
+ size -= nxt_length("\r\n");
+ }
- if (http11) {
- if (n != NXT_HTTP_NOT_MODIFIED && n != NXT_HTTP_NO_CONTENT) {
- h1p->chunked = 1;
- size += nxt_length(chunked);
- /* Trailing CRLF will be added by the first chunk header. */
- size -= nxt_length("\r\n");
+ } else {
+ h1p->keepalive = 0;
}
+ }
- } else {
- h1p->keepalive = 0;
+ if (http11 ^ h1p->keepalive) {
+ conn = h1p->keepalive;
}
}
- conn = -1;
-
- if (http11 ^ h1p->keepalive) {
- conn = h1p->keepalive;
+ if (conn >= 0) {
size += connection[conn].length;
}
@@ -988,15 +1110,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
} nxt_list_loop;
+ if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
+ size += nxt_length(websocket_version);
+ }
+
header = nxt_http_buf_mem(task, r, size);
if (nxt_slow_path(header == NULL)) {
nxt_h1p_request_error(task, h1p, r);
return;
}
- p = header->mem.free;
-
- p = nxt_cpymem(p, status->start, status->length);
+ p = nxt_cpymem(header->mem.free, status->start, status->length);
nxt_list_each(field, r->resp.fields) {
@@ -1013,6 +1137,17 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
p = nxt_cpymem(p, connection[conn].start, connection[conn].length);
}
+ if (h1p->websocket) {
+ nxt_websocket_accept(p, h1p->websocket_key->value);
+ p += NXT_WEBSOCKET_ACCEPT_SIZE;
+
+ *p++ = '\r'; *p++ = '\n';
+ }
+
+ if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
+ p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version));
+ }
+
if (h1p->chunked) {
p = nxt_cpymem(p, chunked, nxt_length(chunked));
/* Trailing CRLF will be added by the first chunk header. */
@@ -1031,6 +1166,58 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
c->write_state = &nxt_h1p_request_send_state;
nxt_conn_write(task->thread->engine, c);
+
+ if (h1p->websocket) {
+ nxt_h1p_websocket_first_frame_start(task, r, c->read);
+ }
+}
+
+
+void
+nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
+{
+ size_t size;
+ nxt_buf_t *b, *in, *next;
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p complete buffers");
+
+ b = h1p->buffers;
+ c = h1p->conn;
+ in = c->read;
+
+ if (b != NULL) {
+ if (in == NULL) {
+ /* A request with large body. */
+ in = b;
+ c->read = in;
+
+ b = in->next;
+ in->next = NULL;
+ }
+
+ while (b != NULL) {
+ next = b->next;
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ b->completion_handler, task, b, b->parent);
+
+ b = next;
+ }
+
+ h1p->buffers = NULL;
+ h1p->nbuffers = 0;
+ }
+
+ if (in != NULL) {
+ size = nxt_buf_mem_used_size(&in->mem);
+
+ if (size == 0) {
+ nxt_mp_free(c->mem_pool, in);
+
+ c->read = NULL;
+ }
+ }
}
@@ -1181,8 +1368,13 @@ nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data)
r = h1p->request;
+ if (nxt_slow_path(r == NULL)) {
+ nxt_h1p_shutdown(task, h1p->conn);
+ return;
+ }
+
if (r->fields == NULL) {
- (void) nxt_h1p_header_process(h1p, r);
+ (void) nxt_h1p_header_process(task, h1p, r);
}
if (r->status == 0) {
@@ -1218,7 +1410,7 @@ nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data)
r = h1p->request;
if (r->fields == NULL) {
- (void) nxt_h1p_header_process(h1p, r);
+ (void) nxt_h1p_header_process(task, h1p, r);
}
nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT);
@@ -1244,7 +1436,7 @@ nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data)
}
-static nxt_msec_t
+nxt_msec_t
nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data)
{
nxt_h1proto_t *h1p;
@@ -1351,7 +1543,7 @@ static void
nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
{
size_t size;
- nxt_buf_t *in, *b, *next;
+ nxt_buf_t *in;
nxt_debug(task, "h1p keepalive");
@@ -1359,40 +1551,22 @@ nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
nxt_conn_tcp_nodelay_on(task, c);
}
- b = h1p->buffers;
+ nxt_h1p_complete_buffers(task, h1p);
+
+ in = c->read;
nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn));
c->sent = 0;
- in = c->read;
-
if (in == NULL) {
- /* A request with large body. */
- in = b;
- c->read = in;
-
- b = in->next;
- in->next = NULL;
- }
-
- while (b != NULL) {
- next = b->next;
- nxt_mp_free(c->mem_pool, b);
- b = next;
- }
-
- size = nxt_buf_mem_used_size(&in->mem);
-
- if (size == 0) {
- nxt_mp_free(c->mem_pool, in);
-
- c->read = NULL;
c->read_state = &nxt_h1p_keepalive_state;
nxt_conn_read(task->thread->engine, c);
} else {
+ size = nxt_buf_mem_used_size(&in->mem);
+
nxt_debug(task, "h1p pipelining");
nxt_memmove(in->mem.start, in->mem.pos, size);
@@ -1421,7 +1595,7 @@ static const nxt_conn_state_t nxt_h1p_keepalive_state
};
-static const nxt_conn_state_t nxt_h1p_idle_close_state
+const nxt_conn_state_t nxt_h1p_idle_close_state
nxt_aligned(64) =
{
.close_handler = nxt_h1p_idle_close,
@@ -1564,8 +1738,33 @@ nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data)
static void
nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
{
+ nxt_timer_t *timer;
+ nxt_h1proto_t *h1p;
+
nxt_debug(task, "h1p shutdown");
+ h1p = c->socket.data;
+
+ if (nxt_slow_path(h1p != NULL && h1p->websocket_timer != NULL)) {
+ timer = &h1p->websocket_timer->timer;
+
+ if (timer->handler != nxt_h1p_conn_ws_shutdown) {
+ timer->handler = nxt_h1p_conn_ws_shutdown;
+ nxt_timer_add(task->thread->engine, timer, 0);
+
+ } else {
+ nxt_debug(task, "h1p already scheduled ws shutdown");
+ }
+
+ } else {
+ nxt_h1p_shutdown_(task, c);
+ }
+}
+
+
+static void
+nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c)
+{
c->socket.data = NULL;
#if (NXT_TLS)
@@ -1597,6 +1796,21 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state
static void
+nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_timer_t *timer;
+ nxt_h1p_websocket_timer_t *ws_timer;
+
+ nxt_debug(task, "h1p conn ws shutdown");
+
+ timer = obj;
+ ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
+
+ nxt_h1p_shutdown_(task, ws_timer->h1p->conn);
+}
+
+
+static void
nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h
new file mode 100644
index 00000000..c6d3bd53
--- /dev/null
+++ b/src/nxt_h1proto.h
@@ -0,0 +1,48 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_H1PROTO_H_INCLUDED_
+#define _NXT_H1PROTO_H_INCLUDED_
+
+
+#include <nxt_main.h>
+#include <nxt_http_parse.h>
+#include <nxt_http.h>
+#include <nxt_router.h>
+
+
+typedef struct nxt_h1p_websocket_timer_s nxt_h1p_websocket_timer_t;
+
+
+struct nxt_h1proto_s {
+ nxt_http_request_parse_t parser;
+
+ uint8_t nbuffers;
+ uint8_t keepalive; /* 1 bit */
+ uint8_t chunked; /* 1 bit */
+ uint8_t websocket; /* 1 bit */
+ uint8_t connection_upgrade; /* 1 bit */
+ uint8_t upgrade_websocket; /* 1 bit */
+ uint8_t websocket_version_ok; /* 1 bit */
+ nxt_http_te_t transfer_encoding:8; /* 2 bits */
+
+ uint8_t websocket_cont_expected; /* 1 bit */
+ uint8_t websocket_closed; /* 1 bit */
+
+ uint32_t header_size;
+
+ nxt_http_field_t *websocket_key;
+ nxt_h1p_websocket_timer_t *websocket_timer;
+
+ nxt_http_request_t *request;
+ nxt_buf_t *buffers;
+ /*
+ * All fields before the conn field will
+ * be zeroed in a keep-alive connection.
+ */
+ nxt_conn_t *conn;
+};
+
+#endif /* _NXT_H1PROTO_H_INCLUDED_ */
diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c
new file mode 100644
index 00000000..dd9b6848
--- /dev/null
+++ b/src/nxt_h1proto_websocket.c
@@ -0,0 +1,719 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_router.h>
+#include <nxt_http.h>
+#include <nxt_h1proto.h>
+#include <nxt_websocket.h>
+#include <nxt_websocket_header.h>
+
+typedef struct {
+ uint16_t code;
+ uint8_t args;
+ nxt_str_t desc;
+} nxt_ws_error_t;
+
+static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
+ nxt_h1proto_t *h1p);
+static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
+ nxt_h1proto_t *h1p);
+static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
+ nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
+static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
+static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c);
+static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
+ void *data);
+static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
+ const nxt_ws_error_t *err, ...);
+static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);
+
+static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state;
+static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state;
+
+static const nxt_ws_error_t nxt_ws_err_out_of_memory = {
+ NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
+ 0, nxt_string("Out of memory") };
+static const nxt_ws_error_t nxt_ws_err_too_big = {
+ NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
+ 1, nxt_string("Message too big: %uL bytes") };
+static const nxt_ws_error_t nxt_ws_err_invalid_close_code = {
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
+ 1, nxt_string("Close code %ud is not valid") };
+static const nxt_ws_error_t nxt_ws_err_going_away = {
+ NXT_WEBSOCKET_CR_GOING_AWAY,
+ 0, nxt_string("Remote peer is going away") };
+static const nxt_ws_error_t nxt_ws_err_not_masked = {
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
+ 0, nxt_string("Not masked client frame") };
+static const nxt_ws_error_t nxt_ws_err_ctrl_fragmented = {
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
+ 0, nxt_string("Fragmented control frame") };
+static const nxt_ws_error_t nxt_ws_err_ctrl_too_big = {
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
+ 1, nxt_string("Control frame too big: %uL bytes") };
+static const nxt_ws_error_t nxt_ws_err_invalid_close_len = {
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
+ 0, nxt_string("Close frame payload length cannot be 1") };
+static const nxt_ws_error_t nxt_ws_err_invalid_opcode = {
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
+ 1, nxt_string("Unrecognized opcode %ud") };
+static const nxt_ws_error_t nxt_ws_err_cont_expected = {
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
+ 1, nxt_string("Continuation expected, but %ud opcode received") };
+
+void
+nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *ws_frame)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_h1proto_t *h1p;
+ nxt_socket_conf_joint_t *joint;
+
+ nxt_debug(task, "h1p ws first frame start");
+
+ h1p = r->proto.h1;
+ c = h1p->conn;
+
+ if (!c->tcp_nodelay) {
+ nxt_conn_tcp_nodelay_on(task, c);
+ }
+
+ joint = c->listen->socket.data;
+
+ if (nxt_slow_path(joint != NULL
+ && joint->socket_conf->websocket_conf.keepalive_interval != 0))
+ {
+ h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
+ sizeof(nxt_h1p_websocket_timer_t));
+ if (nxt_slow_path(h1p->websocket_timer == NULL)) {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
+ return;
+ }
+
+ h1p->websocket_timer->keepalive_interval =
+ joint->socket_conf->websocket_conf.keepalive_interval;
+ h1p->websocket_timer->h1p = h1p;
+
+ timer = &h1p->websocket_timer->timer;
+ timer->task = &c->task;
+ timer->work_queue = &task->thread->engine->fast_work_queue;
+ timer->log = &c->log;
+ timer->bias = NXT_TIMER_DEFAULT_BIAS;
+ timer->handler = nxt_h1p_conn_ws_keepalive;
+ }
+
+ nxt_h1p_websocket_frame_start(task, r, ws_frame);
+}
+
+
+void
+nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *ws_frame)
+{
+ size_t size;
+ nxt_buf_t *in;
+ nxt_conn_t *c;
+ nxt_h1proto_t *h1p;
+
+ nxt_debug(task, "h1p ws frame start");
+
+ h1p = r->proto.h1;
+
+ if (nxt_slow_path(h1p->websocket_closed)) {
+ return;
+ }
+
+ c = h1p->conn;
+ c->read = ws_frame;
+
+ nxt_h1p_complete_buffers(task, h1p);
+
+ in = c->read;
+ c->read_state = &nxt_h1p_read_ws_frame_header_state;
+
+ if (in == NULL) {
+ nxt_conn_read(task->thread->engine, c);
+ nxt_h1p_conn_ws_keepalive_enable(task, h1p);
+
+ } else {
+ size = nxt_buf_mem_used_size(&in->mem);
+
+ nxt_debug(task, "h1p read client ws frame");
+
+ nxt_memmove(in->mem.start, in->mem.pos, size);
+
+ in->mem.pos = in->mem.start;
+ in->mem.free = in->mem.start + size;
+
+ nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
+ }
+}
+
+
+static void
+nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *out;
+ nxt_timer_t *timer;
+ nxt_h1proto_t *h1p;
+ nxt_http_request_t *r;
+ nxt_websocket_header_t *wsh;
+ nxt_h1p_websocket_timer_t *ws_timer;
+
+ nxt_debug(task, "h1p conn ws keepalive");
+
+ timer = obj;
+ ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
+ h1p = ws_timer->h1p;
+
+ r = h1p->request;
+ if (nxt_slow_path(r == NULL)) {
+ return;
+ }
+
+ out = nxt_http_buf_mem(task, r, 2);
+ if (nxt_slow_path(out == NULL)) {
+ nxt_http_request_error_handler(task, r, r->proto.any);
+ return;
+ }
+
+ out->mem.start[0] = 0;
+ out->mem.start[1] = 0;
+
+ wsh = (nxt_websocket_header_t *) out->mem.start;
+ out->mem.free = nxt_websocket_frame_init(wsh, 0);
+
+ wsh->fin = 1;
+ wsh->opcode = NXT_WEBSOCKET_OP_PING;
+
+ nxt_http_request_send(task, r, out);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_conn_ws_frame_header_read,
+ .close_handler = nxt_h1p_conn_ws_error,
+ .error_handler = nxt_h1p_conn_ws_error,
+
+ .io_read_handler = nxt_h1p_ws_io_read_handler,
+
+ .timer_handler = nxt_h1p_conn_ws_timeout,
+ .timer_value = nxt_h1p_conn_request_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
+{
+ size_t size, hsize, frame_size, max_frame_size;
+ uint64_t payload_len;
+ nxt_conn_t *c;
+ nxt_h1proto_t *h1p;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+ nxt_websocket_header_t *wsh;
+ nxt_socket_conf_joint_t *joint;
+
+ c = obj;
+ h1p = data;
+
+ nxt_h1p_conn_ws_keepalive_disable(task, h1p);
+
+ size = nxt_buf_mem_used_size(&c->read->mem);
+
+ engine = task->thread->engine;
+
+ if (size < 2) {
+ nxt_debug(task, "h1p conn ws frame header read %z", size);
+
+ nxt_conn_read(engine, c);
+ nxt_h1p_conn_ws_keepalive_enable(task, h1p);
+
+ return;
+ }
+
+ wsh = (nxt_websocket_header_t *) c->read->mem.pos;
+
+ hsize = nxt_websocket_frame_header_size(wsh);
+
+ if (size < hsize) {
+ nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);
+
+ nxt_conn_read(engine, c);
+ nxt_h1p_conn_ws_keepalive_enable(task, h1p);
+
+ return;
+ }
+
+ r = h1p->request;
+ if (nxt_slow_path(r == NULL)) {
+ return;
+ }
+
+ r->ws_frame = c->read;
+
+ joint = c->listen->socket.data;
+
+ if (nxt_slow_path(joint == NULL)) {
+ /*
+ * Listening socket had been closed while
+ * connection was in keep-alive state.
+ */
+ c->read_state = &nxt_h1p_idle_close_state;
+ return;
+ }
+
+ if (nxt_slow_path(wsh->mask == 0)) {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
+ return;
+ }
+
+ if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
+ if (nxt_slow_path(wsh->fin == 0)) {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
+ return;
+ }
+
+ if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
+ && wsh->opcode != NXT_WEBSOCKET_OP_PONG
+ && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
+ {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
+ wsh->opcode);
+ return;
+ }
+
+ if (nxt_slow_path(wsh->payload_len > 125)) {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
+ nxt_websocket_frame_payload_len(wsh));
+ return;
+ }
+
+ if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
+ && wsh->payload_len == 1))
+ {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
+ return;
+ }
+
+ } else {
+ if (h1p->websocket_cont_expected) {
+ if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
+ wsh->opcode);
+ return;
+ }
+
+ } else {
+ if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
+ && wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
+ {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
+ wsh->opcode);
+ return;
+ }
+ }
+
+ h1p->websocket_cont_expected = !wsh->fin;
+ }
+
+ max_frame_size = joint->socket_conf->websocket_conf.max_frame_size;
+
+ payload_len = nxt_websocket_frame_payload_len(wsh);
+
+ if (nxt_slow_path(hsize > max_frame_size
+ || payload_len > (max_frame_size - hsize)))
+ {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
+ return;
+ }
+
+ c->read_state = &nxt_h1p_read_ws_frame_payload_state;
+
+ frame_size = payload_len + hsize;
+
+ nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);
+
+ if (frame_size <= size) {
+ nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
+
+ return;
+ }
+
+ if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
+ c->read->mem.end = c->read->mem.start + frame_size;
+
+ } else {
+ nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);
+
+ c->read->next = b;
+ c->read = b;
+ }
+
+ nxt_conn_read(engine, c);
+ nxt_h1p_conn_ws_keepalive_enable(task, h1p);
+}
+
+
+static void
+nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
+{
+ nxt_timer_t *timer;
+
+ if (h1p->websocket_timer == NULL) {
+ return;
+ }
+
+ timer = &h1p->websocket_timer->timer;
+
+ if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
+ nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
+ return;
+ }
+
+ nxt_timer_disable(task->thread->engine, timer);
+}
+
+
+static void
+nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
+{
+ nxt_timer_t *timer;
+
+ if (h1p->websocket_timer == NULL) {
+ return;
+ }
+
+ timer = &h1p->websocket_timer->timer;
+
+ if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
+ nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
+ return;
+ }
+
+ nxt_timer_add(task->thread->engine, timer,
+ h1p->websocket_timer->keepalive_interval);
+}
+
+
+static void
+nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
+ nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
+{
+ size_t hsize;
+ uint8_t *p, *mask;
+ uint16_t code;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+
+ engine = task->thread->engine;
+ r = h1p->request;
+
+ c->read = NULL;
+
+ if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
+ nxt_work_queue_add(&engine->fast_work_queue, nxt_h1p_conn_ws_pong,
+ task, r, NULL);
+ return;
+ }
+
+ if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
+ if (wsh->payload_len >= 2) {
+ hsize = nxt_websocket_frame_header_size(wsh);
+ mask = nxt_pointer_to(wsh, hsize - 4);
+ p = nxt_pointer_to(wsh, hsize);
+
+ code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);
+
+ if (nxt_slow_path(code < 1000 || code >= 5000
+ || (code > 1003 && code < 1007)
+ || (code > 1014 && code < 3000)))
+ {
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
+ code);
+ return;
+ }
+ }
+
+ h1p->websocket_closed = 1;
+ }
+
+ nxt_work_queue_add(&engine->fast_work_queue, r->state->ready_handler,
+ task, r, NULL);
+}
+
+
+static void
+nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_h1proto_t *h1p;
+ nxt_http_request_t *r;
+
+ h1p = data;
+
+ nxt_debug(task, "h1p conn ws error");
+
+ r = h1p->request;
+
+ h1p->keepalive = 0;
+
+ if (nxt_fast_path(r != NULL)) {
+ r->state->error_handler(task, r, h1p);
+ }
+}
+
+
+static ssize_t
+nxt_h1p_ws_io_read_handler(nxt_conn_t *c)
+{
+ size_t size;
+ ssize_t n;
+ nxt_buf_t *b;
+
+ b = c->read;
+
+ if (b == NULL) {
+ /* Enough for control frame. */
+ size = 10 + 125;
+
+ b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ c->socket.error = NXT_ENOMEM;
+ return NXT_ERROR;
+ }
+ }
+
+ n = c->io->recvbuf(c, b);
+
+ if (n > 0) {
+ c->read = b;
+
+ } else {
+ c->read = NULL;
+ nxt_mp_free(c->mem_pool, b);
+ }
+
+ return n;
+}
+
+
+static void
+nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_h1proto_t *h1p;
+ nxt_http_request_t *r;
+
+ timer = obj;
+
+ nxt_debug(task, "h1p conn ws timeout");
+
+ c = nxt_read_timer_conn(timer);
+ c->block_read = 1;
+ /*
+ * Disable SO_LINGER off during socket closing
+ * to send "408 Request Timeout" error response.
+ */
+ c->socket.timedout = 0;
+
+ h1p = c->socket.data;
+ h1p->keepalive = 0;
+
+ r = h1p->request;
+ if (nxt_slow_path(r == NULL)) {
+ return;
+ }
+
+ hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_conn_ws_frame_payload_read,
+ .close_handler = nxt_h1p_conn_ws_error,
+ .error_handler = nxt_h1p_conn_ws_error,
+
+ .timer_handler = nxt_h1p_conn_ws_timeout,
+ .timer_value = nxt_h1p_conn_request_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_h1proto_t *h1p;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+ nxt_websocket_header_t *wsh;
+
+ c = obj;
+ h1p = data;
+
+ nxt_h1p_conn_ws_keepalive_disable(task, h1p);
+
+ nxt_debug(task, "h1p conn ws frame read");
+
+ if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
+ r = h1p->request;
+ if (nxt_slow_path(r == NULL)) {
+ return;
+ }
+
+ wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
+
+ nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
+
+ return;
+ }
+
+ engine = task->thread->engine;
+
+ nxt_conn_read(engine, c);
+ nxt_h1p_conn_ws_keepalive_enable(task, h1p);
+}
+
+
+static void
+hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
+ const nxt_ws_error_t *err, ...)
+{
+ u_char *p;
+ va_list args;
+ nxt_buf_t *out;
+ nxt_str_t desc;
+ nxt_websocket_header_t *wsh;
+ u_char buf[125];
+
+ if (nxt_slow_path(err->args)) {
+ va_start(args, err);
+ p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
+ args);
+ va_end(args);
+
+ desc.start = buf;
+ desc.length = p - buf;
+
+ } else {
+ desc = err->desc;
+ }
+
+ nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);
+
+ out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
+ if (nxt_slow_path(out == NULL)) {
+ nxt_http_request_error_handler(task, r, r->proto.any);
+ return;
+ }
+
+ out->mem.start[0] = 0;
+ out->mem.start[1] = 0;
+
+ wsh = (nxt_websocket_header_t *) out->mem.start;
+ p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);
+
+ wsh->fin = 1;
+ wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;
+
+ *p++ = (err->code >> 8) & 0xFF;
+ *p++ = err->code & 0xFF;
+
+ out->mem.free = nxt_cpymem(p, desc.start, desc.length);
+ out->next = nxt_http_buf_last(r);
+
+ if (out->next != NULL) {
+ out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
+ }
+
+ nxt_http_request_send(task, r, out);
+}
+
+
+static void
+nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+
+ r = data;
+
+ nxt_debug(task, "h1p conn ws error sent");
+
+ r->state->error_handler(task, r, r->proto.any);
+}
+
+
+static void
+nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
+{
+ uint8_t payload_len, i;
+ nxt_buf_t *b, *out, *next;
+ nxt_http_request_t *r;
+ nxt_websocket_header_t *wsh;
+ uint8_t mask[4];
+
+ nxt_debug(task, "h1p conn ws pong");
+
+ r = obj;
+ b = r->ws_frame;
+
+ wsh = (nxt_websocket_header_t *) b->mem.pos;
+ payload_len = wsh->payload_len;
+
+ b->mem.pos += 2;
+
+ nxt_memcpy(mask, b->mem.pos, 4);
+
+ b->mem.pos += 4;
+
+ out = nxt_http_buf_mem(task, r, 2 + payload_len);
+ if (nxt_slow_path(out == NULL)) {
+ nxt_http_request_error_handler(task, r, r->proto.any);
+ return;
+ }
+
+ out->mem.start[0] = 0;
+ out->mem.start[1] = 0;
+
+ wsh = (nxt_websocket_header_t *) out->mem.start;
+ out->mem.free = nxt_websocket_frame_init(wsh, payload_len);
+
+ wsh->fin = 1;
+ wsh->opcode = NXT_WEBSOCKET_OP_PONG;
+
+ for (i = 0; i < payload_len; i++) {
+ while (nxt_buf_mem_used_size(&b->mem) == 0) {
+ next = b->next;
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ b->completion_handler, task, b, b->parent);
+
+ b = next;
+ }
+
+ *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
+ }
+
+ r->ws_frame = b;
+
+ nxt_http_request_send(task, r, out);
+
+ nxt_http_request_ws_frame_start(task, r, r->ws_frame);
+}
diff --git a/src/nxt_http.h b/src/nxt_http.h
index c1a230ec..ac1eedcf 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -11,6 +11,9 @@
typedef enum {
NXT_HTTP_INVALID = 0,
+ NXT_HTTP_CONTINUE = 100,
+ NXT_HTTP_SWITCHING_PROTOCOLS = 101,
+
NXT_HTTP_OK = 200,
NXT_HTTP_NO_CONTENT = 204,
@@ -26,6 +29,7 @@ typedef enum {
NXT_HTTP_LENGTH_REQUIRED = 411,
NXT_HTTP_PAYLOAD_TOO_LARGE = 413,
NXT_HTTP_URI_TOO_LONG = 414,
+ NXT_HTTP_UPGRADE_REQUIRED = 426,
NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE = 431,
NXT_HTTP_TO_HTTPS = 497,
@@ -61,6 +65,13 @@ typedef struct {
typedef struct nxt_h1proto_s nxt_h1proto_t;
+struct nxt_h1p_websocket_timer_s {
+ nxt_timer_t timer;
+ nxt_h1proto_t *h1p;
+ nxt_msec_t keepalive_interval;
+};
+
+
typedef union {
void *any;
nxt_h1proto_t *h1;
@@ -99,6 +110,7 @@ struct nxt_http_request_s {
nxt_mp_t *mem_pool;
nxt_buf_t *body;
+ nxt_buf_t *ws_frame;
nxt_buf_t *out;
const nxt_http_request_state_t *state;
@@ -127,6 +139,8 @@ struct nxt_http_request_s {
nxt_timer_t timer;
void *timer_data;
+ void *req_rpc_data;
+
nxt_buf_t *last;
nxt_http_response_t resp;
@@ -138,6 +152,7 @@ struct nxt_http_request_s {
uint8_t logged; /* 1 bit */
uint8_t header_sent; /* 1 bit */
uint8_t error; /* 1 bit */
+ uint8_t websocket_handshake; /* 1 bit */
};
@@ -166,6 +181,8 @@ typedef struct {
void (*discard)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last);
void (*close)(nxt_task_t *task, nxt_http_proto_t proto,
nxt_socket_conf_joint_t *joint);
+ void (*ws_frame_start)(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *ws_frame);
} nxt_http_proto_table_t;
@@ -179,12 +196,15 @@ void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_status_t status);
void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r);
void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r);
+void nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *ws_frame);
void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
nxt_buf_t *nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r,
size_t size);
nxt_buf_t *nxt_http_buf_last(nxt_http_request_t *r);
void nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data);
+void nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_http_request_host(void *ctx, nxt_http_field_t *field,
uintptr_t data);
@@ -212,5 +232,13 @@ extern nxt_lvlhsh_t nxt_response_fields_hash;
extern const nxt_http_proto_table_t nxt_http_proto[];
+void nxt_h1p_websocket_first_frame_start(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_buf_t *ws_frame);
+void nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *ws_frame);
+void nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p);
+nxt_msec_t nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data);
+
+extern const nxt_conn_state_t nxt_h1p_idle_close_state;
#endif /* _NXT_HTTP_H_INCLUDED_ */
diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c
index 1ab22223..916004d2 100644
--- a/src/nxt_http_request.c
+++ b/src/nxt_http_request.c
@@ -16,8 +16,6 @@ static void nxt_http_request_proto_info(nxt_task_t *task,
static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj,
void *data);
static void nxt_http_request_done(nxt_task_t *task, void *obj, void *data);
-static void nxt_http_request_close_handler(nxt_task_t *task, void *obj,
- void *data);
static u_char *nxt_http_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
size_t size, const char *format);
@@ -444,6 +442,16 @@ fail:
void
+nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *ws_frame)
+{
+ if (r->proto.any != NULL) {
+ nxt_http_proto[r->protocol].ws_frame_start(task, r, ws_frame);
+ }
+}
+
+
+void
nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
{
if (nxt_fast_path(r->proto.any != NULL)) {
@@ -530,7 +538,7 @@ nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data)
}
-static void
+void
nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_http_proto_t proto;
@@ -556,12 +564,13 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data)
}
}
- protocol = r->protocol;
-
r->proto.any = NULL;
- nxt_mp_release(r->mem_pool);
if (nxt_fast_path(proto.any != NULL)) {
+ protocol = r->protocol;
+
+ nxt_mp_release(r->mem_pool);
+
nxt_http_proto[protocol].close(task, proto, conf);
}
}
diff --git a/src/nxt_http_response.c b/src/nxt_http_response.c
index 755182db..00ecff00 100644
--- a/src/nxt_http_response.c
+++ b/src/nxt_http_response.c
@@ -28,6 +28,8 @@ static nxt_http_field_proc_t nxt_response_fields[] = {
offsetof(nxt_http_request_t, resp.content_type) },
{ nxt_string("Content-Length"), &nxt_http_response_field,
offsetof(nxt_http_request_t, resp.content_length) },
+ { nxt_string("Upgrade"), &nxt_http_response_skip, 0 },
+ { nxt_string("Sec-WebSocket-Accept"), &nxt_http_response_skip, 0 },
};
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c
new file mode 100644
index 00000000..d58d615c
--- /dev/null
+++ b/src/nxt_http_websocket.c
@@ -0,0 +1,161 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_router.h>
+#include <nxt_http.h>
+#include <nxt_router_request.h>
+#include <nxt_port_memory_int.h>
+#include <nxt_websocket.h>
+#include <nxt_websocket_header.h>
+
+
+static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
+ void *data);
+
+
+const nxt_http_request_state_t nxt_http_websocket
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_websocket_client,
+ .error_handler = nxt_http_websocket_error_handler,
+};
+
+
+static void
+nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
+{
+ size_t frame_size, used_size, copy_size, buf_free_size;
+ size_t chunk_copy_size;
+ nxt_buf_t *out, *buf, **out_tail, *b, *next;
+ nxt_int_t res;
+ nxt_http_request_t *r;
+ nxt_request_app_link_t *req_app_link;
+ nxt_request_rpc_data_t *req_rpc_data;
+ nxt_websocket_header_t *wsh;
+
+ r = obj;
+
+ if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
+ || (req_app_link = req_rpc_data->req_app_link) == NULL))
+ {
+ nxt_debug(task, "websocket client frame for destroyed request");
+
+ return;
+ }
+
+ nxt_debug(task, "http websocket client frame");
+
+ wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
+
+ frame_size = nxt_websocket_frame_header_size(wsh)
+ + nxt_websocket_frame_payload_len(wsh);
+
+ buf = NULL;
+ buf_free_size = 0;
+ out = NULL;
+ out_tail = &out;
+
+ b = r->ws_frame;
+
+ while (b != NULL && frame_size > 0) {
+ used_size = nxt_buf_mem_used_size(&b->mem);
+ copy_size = nxt_min(used_size, frame_size);
+
+ while (copy_size > 0) {
+ if (buf == NULL || buf_free_size == 0) {
+ buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
+
+ buf = nxt_port_mmap_get_buf(task, req_app_link->app_port,
+ buf_free_size);
+
+ *out_tail = buf;
+ out_tail = &buf->next;
+ }
+
+ chunk_copy_size = nxt_min(buf_free_size, copy_size);
+
+ buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
+ chunk_copy_size);
+
+ copy_size -= chunk_copy_size;
+ b->mem.pos += chunk_copy_size;
+ buf_free_size -= chunk_copy_size;
+ }
+
+ frame_size -= copy_size;
+ next = b->next;
+
+ if (nxt_buf_mem_used_size(&b->mem) == 0) {
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ b->completion_handler, task, b, b->parent);
+
+ r->ws_frame = next;
+ }
+
+ b = next;
+ }
+
+ res = nxt_port_socket_twrite(task, req_app_link->app_port,
+ NXT_PORT_MSG_WEBSOCKET, -1,
+ req_app_link->stream,
+ req_app_link->reply_port->id, out, NULL);
+ if (nxt_slow_path(res != NXT_OK)) {
+ // TODO: handle
+ }
+
+ b = r->ws_frame;
+
+ if (b != NULL) {
+ used_size = nxt_buf_mem_used_size(&b->mem);
+
+ if (used_size > 0) {
+ nxt_memmove(b->mem.start, b->mem.pos, used_size);
+
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start + used_size;
+ }
+ }
+
+ nxt_http_request_ws_frame_start(task, r, r->ws_frame);
+}
+
+
+static void
+nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+ nxt_request_app_link_t *req_app_link;
+ nxt_request_rpc_data_t *req_rpc_data;
+
+ nxt_debug(task, "http websocket error handler");
+
+ r = obj;
+
+ if ((req_rpc_data = r->req_rpc_data) == NULL) {
+ nxt_debug(task, " req_rpc_data is NULL");
+ goto close_handler;
+ }
+
+ if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
+ nxt_debug(task, " req_app_link is NULL");
+ goto close_handler;
+ }
+
+ if (req_app_link->app_port == NULL) {
+ nxt_debug(task, " app_port is NULL");
+ goto close_handler;
+ }
+
+ (void) nxt_port_socket_twrite(task, req_app_link->app_port,
+ NXT_PORT_MSG_WEBSOCKET_LAST,
+ -1, req_app_link->stream,
+ req_app_link->reply_port->id, NULL, NULL);
+
+close_handler:
+
+ nxt_http_request_close_handler(task, obj, data);
+}
diff --git a/src/nxt_port.c b/src/nxt_port.c
index aff46666..cef65cab 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -68,6 +68,7 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex);
nxt_queue_init(&port->pending_requests);
+ nxt_queue_init(&port->active_websockets);
} else {
nxt_mp_destroy(mp);
diff --git a/src/nxt_port.h b/src/nxt_port.h
index e4e76693..eeb6caa5 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -36,6 +36,12 @@ struct nxt_port_handlers_s {
/* Stop process command. */
nxt_port_handler_t quit;
+ /* Request headers. */
+ nxt_port_handler_t req_headers;
+
+ /* Websocket frame. */
+ nxt_port_handler_t websocket_frame;
+
/* Various data. */
nxt_port_handler_t data;
};
@@ -71,6 +77,9 @@ typedef enum {
_NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid),
_NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
+ _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
+ _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
+
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
@@ -99,6 +108,10 @@ typedef enum {
NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
+ NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
+ NXT_PORT_MSG_WEBSOCKET_LAST = _NXT_PORT_MSG_WEBSOCKET | NXT_PORT_MSG_LAST,
+
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
} nxt_port_msg_type_t;
@@ -181,6 +194,8 @@ struct nxt_port_s {
uint32_t app_responses;
nxt_queue_t pending_requests;
+ nxt_queue_t active_websockets;
+
nxt_port_handler_t handler;
nxt_port_handler_t *data;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 566e0c65..b87f588f 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -14,7 +14,7 @@
#include <nxt_port_memory_int.h>
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
-
+#include <nxt_router_request.h>
typedef struct {
nxt_str_t type;
@@ -48,64 +48,6 @@ typedef struct {
#endif
-typedef struct nxt_msg_info_s {
- nxt_buf_t *buf;
- nxt_port_mmap_tracking_t tracking;
- nxt_work_handler_t completion_handler;
-} nxt_msg_info_t;
-
-
-typedef struct nxt_request_app_link_s nxt_request_app_link_t;
-
-
-typedef enum {
- NXT_APR_NEW_PORT,
- NXT_APR_REQUEST_FAILED,
- NXT_APR_GOT_RESPONSE,
- NXT_APR_CLOSE,
-} nxt_apr_action_t;
-
-
-typedef struct {
- uint32_t stream;
- nxt_app_t *app;
-
- nxt_port_t *app_port;
- nxt_apr_action_t apr_action;
-
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_request_app_link_t *req_app_link;
-} nxt_request_rpc_data_t;
-
-
-struct nxt_request_app_link_s {
- uint32_t stream;
- nxt_atomic_t use_count;
-
- nxt_port_t *app_port;
- nxt_apr_action_t apr_action;
-
- nxt_port_t *reply_port;
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_request_rpc_data_t *req_rpc_data;
-
- nxt_nsec_t res_time;
-
- nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
- /* for nxt_port_t.pending_requests */
- nxt_queue_link_t link_port_pending;
- nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
-
- nxt_mp_t *mem_pool;
- nxt_work_t work;
-
- int err_code;
- const char *err_str;
-};
-
-
typedef struct {
nxt_socket_conf_t *socket_conf;
nxt_router_temp_conf_t *temp_conf;
@@ -305,6 +247,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
+const nxt_http_request_state_t nxt_http_websocket;
+
static nxt_router_t *nxt_router;
static const nxt_str_t http_prefix = nxt_string("HTTP_");
@@ -663,6 +607,7 @@ nxt_request_app_link_release(nxt_task_t *task,
nxt_request_app_link_t *req_app_link)
{
nxt_mp_t *mp;
+ nxt_http_request_t *r;
nxt_request_rpc_data_t *req_rpc_data;
nxt_assert(task->thread->engine == req_app_link->work.data);
@@ -683,10 +628,11 @@ nxt_request_app_link_release(nxt_task_t *task,
req_rpc_data->msg_info = req_app_link->msg_info;
if (req_rpc_data->app->timeout != 0) {
- req_rpc_data->request->timer.handler = nxt_router_app_timeout;
- req_rpc_data->request->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine,
- &req_rpc_data->request->timer,
+ r = req_rpc_data->request;
+
+ r->timer.handler = nxt_router_app_timeout;
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer,
req_rpc_data->app->timeout);
}
@@ -833,14 +779,16 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
if (req_app_link->link_app_requests.next == NULL
&& req_app_link->link_port_pending.next == NULL
- && req_app_link->link_app_pending.next == NULL)
+ && req_app_link->link_app_pending.next == NULL
+ && req_app_link->link_port_websockets.next == NULL)
{
req_app_link = NULL;
} else {
ra_use_delta -=
nxt_queue_chk_remove(&req_app_link->link_app_requests)
- + nxt_queue_chk_remove(&req_app_link->link_port_pending);
+ + nxt_queue_chk_remove(&req_app_link->link_port_pending)
+ + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
nxt_queue_chk_remove(&req_app_link->link_app_pending);
}
@@ -863,6 +811,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_router_http_request_done(task, req_rpc_data->request);
+ req_rpc_data->request->req_rpc_data = NULL;
req_rpc_data->request = NULL;
}
}
@@ -1412,6 +1361,28 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
};
+static nxt_conf_map_t nxt_router_websocket_conf[] = {
+ {
+ nxt_string("max_frame_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_websocket_conf_t, max_frame_size),
+ },
+
+ {
+ nxt_string("read_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_websocket_conf_t, read_timeout),
+ },
+
+ {
+ nxt_string("keepalive_interval"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_websocket_conf_t, keepalive_interval),
+ },
+
+};
+
+
static nxt_int_t
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, u_char *end)
@@ -1425,7 +1396,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
- nxt_conf_value_t *conf, *http, *value;
+ nxt_conf_value_t *conf, *http, *value, *websocket;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_conf_value_t *routes_conf;
@@ -1448,6 +1419,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
#if (NXT_TLS)
static nxt_str_t certificate_path = nxt_string("/tls/certificate");
#endif
+ static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
if (conf == NULL) {
@@ -1658,6 +1630,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
#endif
+ websocket = nxt_conf_get_path(conf, &websocket_path);
+
listeners = nxt_conf_get_path(conf, &listeners_path);
if (listeners != NULL) {
@@ -1697,6 +1671,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->body_read_timeout = 30 * 1000;
skcf->send_timeout = 30 * 1000;
+ skcf->websocket_conf.max_frame_size = 1024 * 1024;
+ skcf->websocket_conf.read_timeout = 60 * 1000;
+ skcf->websocket_conf.keepalive_interval = 30 * 1000;
+
if (http != NULL) {
ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
nxt_nitems(nxt_router_http_conf),
@@ -1707,6 +1685,17 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
+ if (websocket != NULL) {
+ ret = nxt_conf_map_object(mp, websocket,
+ nxt_router_websocket_conf,
+ nxt_nitems(nxt_router_websocket_conf),
+ &skcf->websocket_conf);
+ if (ret != NXT_OK) {
+ nxt_alert(task, "websocket map error");
+ goto fail;
+ }
+ }
+
#if (NXT_TLS)
value = nxt_conf_get_path(listener, &certificate_path);
@@ -3418,10 +3407,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
{
nxt_int_t ret;
nxt_buf_t *b;
+ nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
nxt_http_request_t *r;
nxt_unit_response_t *resp;
+ nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
b = msg->buf;
@@ -3542,7 +3533,48 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_http_request_header_send(task, r);
- r->state = &nxt_http_request_send_state;
+ if (r->websocket_handshake
+ && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
+ {
+ req_app_link = nxt_request_app_link_alloc(task,
+ req_rpc_data->req_app_link,
+ req_rpc_data);
+ if (nxt_slow_path(req_app_link == NULL)) {
+ goto fail;
+ }
+
+ app_port = req_app_link->app_port;
+
+ if (app_port == NULL && req_rpc_data->app_port != NULL) {
+ req_app_link->app_port = req_rpc_data->app_port;
+ app_port = req_app_link->app_port;
+ req_app_link->apr_action = req_rpc_data->apr_action;
+
+ req_rpc_data->app_port = NULL;
+ }
+
+ if (nxt_slow_path(app_port == NULL)) {
+ goto fail;
+ }
+
+ nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+
+ nxt_queue_insert_tail(&app_port->active_websockets,
+ &req_app_link->link_port_websockets);
+
+ nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+
+ nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
+ req_app_link->apr_action = NXT_APR_CLOSE;
+
+ nxt_debug(task, "req_app_link stream #%uD upgrade",
+ req_app_link->stream);
+
+ r->state = &nxt_http_websocket;
+
+ } else {
+ r->state = &nxt_http_request_send_state;
+ }
if (r->out) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
@@ -3924,6 +3956,10 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
got_response = 1;
inc_use = -1;
break;
+ case NXT_APR_UPGRADE:
+ dec_pending = 1;
+ got_response = 1;
+ break;
case NXT_APR_CLOSE:
inc_use = -1;
break;
@@ -4046,9 +4082,10 @@ re_ra_cancelled:
adjust_idle_timer = 0;
- if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
- nxt_assert(port->idle_link.next == NULL);
-
+ if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
+ && nxt_queue_is_empty(&port->active_websockets)
+ && port->idle_link.next == NULL)
+ {
if (app->idle_processes == app->spare_processes
&& app->adjust_idle_work.data == NULL)
{
@@ -4545,6 +4582,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_router_app_use(task, app, 1);
req_rpc_data->request = r;
+ r->req_rpc_data = req_rpc_data;
req_app_link = &ra_local;
nxt_request_app_link_init(task, req_app_link, req_rpc_data);
@@ -4635,7 +4673,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
goto release_port;
}
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
+ res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
-1, req_app_link->stream, reply_port->id, buf,
&req_app_link->msg_info.tracking);
@@ -4785,6 +4823,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
*p++ = '\0';
req->tls = (r->tls != NULL);
+ req->websocket_handshake = r->websocket_handshake;
req->server_name_length = r->server_name.length;
nxt_unit_sptr_set(&req->server_name, p);
diff --git a/src/nxt_router.h b/src/nxt_router.h
index ff791e3d..b55a4de3 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -139,6 +139,13 @@ struct nxt_app_s {
typedef struct {
+ size_t max_frame_size;
+ nxt_msec_t read_timeout;
+ nxt_msec_t keepalive_interval;
+} nxt_websocket_conf_t;
+
+
+typedef struct {
uint32_t count;
nxt_queue_link_t link;
nxt_router_conf_t *router_conf;
@@ -164,6 +171,8 @@ typedef struct {
nxt_msec_t body_read_timeout;
nxt_msec_t send_timeout;
+ nxt_websocket_conf_t websocket_conf;
+
#if (NXT_TLS)
nxt_tls_conf_t *tls;
#endif
diff --git a/src/nxt_router_request.h b/src/nxt_router_request.h
new file mode 100644
index 00000000..c3d5767e
--- /dev/null
+++ b/src/nxt_router_request.h
@@ -0,0 +1,71 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_ROUTER_REQUEST_H_INCLUDED_
+#define _NXT_ROUTER_REQUEST_H_INCLUDED_
+
+
+typedef struct nxt_msg_info_s {
+ nxt_buf_t *buf;
+ nxt_port_mmap_tracking_t tracking;
+ nxt_work_handler_t completion_handler;
+} nxt_msg_info_t;
+
+
+typedef struct nxt_request_app_link_s nxt_request_app_link_t;
+
+
+typedef enum {
+ NXT_APR_NEW_PORT,
+ NXT_APR_REQUEST_FAILED,
+ NXT_APR_GOT_RESPONSE,
+ NXT_APR_UPGRADE,
+ NXT_APR_CLOSE,
+} nxt_apr_action_t;
+
+
+typedef struct {
+ uint32_t stream;
+ nxt_app_t *app;
+
+ nxt_port_t *app_port;
+ nxt_apr_action_t apr_action;
+
+ nxt_http_request_t *request;
+ nxt_msg_info_t msg_info;
+ nxt_request_app_link_t *req_app_link;
+} nxt_request_rpc_data_t;
+
+
+struct nxt_request_app_link_s {
+ uint32_t stream;
+ nxt_atomic_t use_count;
+
+ nxt_port_t *app_port;
+ nxt_apr_action_t apr_action;
+
+ nxt_port_t *reply_port;
+ nxt_http_request_t *request;
+ nxt_msg_info_t msg_info;
+ nxt_request_rpc_data_t *req_rpc_data;
+
+ nxt_nsec_t res_time;
+
+ nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
+ /* for nxt_port_t.pending_requests */
+ nxt_queue_link_t link_port_pending;
+ nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
+ /* for nxt_port_t.active_websockets */
+ nxt_queue_link_t link_port_websockets;
+
+ nxt_mp_t *mem_pool;
+ nxt_work_t work;
+
+ int err_code;
+ const char *err_str;
+};
+
+
+#endif /* _NXT_ROUTER_REQUEST_H_INCLUDED_ */
diff --git a/src/nxt_sha1.c b/src/nxt_sha1.c
new file mode 100644
index 00000000..407c5933
--- /dev/null
+++ b/src/nxt_sha1.c
@@ -0,0 +1,295 @@
+
+/*
+ * Copyright (C) Maxim Dounin
+ * Copyright (C) NGINX, Inc.
+ *
+ * An internal SHA1 implementation.
+ */
+
+
+#include <nxt_main.h>
+#include <nxt_sha1.h>
+
+
+static const u_char *nxt_sha1_body(nxt_sha1_t *ctx, const u_char *data,
+ size_t size);
+
+
+void
+nxt_sha1_init(nxt_sha1_t *ctx)
+{
+ ctx->a = 0x67452301;
+ ctx->b = 0xefcdab89;
+ ctx->c = 0x98badcfe;
+ ctx->d = 0x10325476;
+ ctx->e = 0xc3d2e1f0;
+
+ ctx->bytes = 0;
+}
+
+
+void
+nxt_sha1_update(nxt_sha1_t *ctx, const void *data, size_t size)
+{
+ size_t used, free;
+
+ used = (size_t) (ctx->bytes & 0x3f);
+ ctx->bytes += size;
+
+ if (used) {
+ free = 64 - used;
+
+ if (size < free) {
+ memcpy(&ctx->buffer[used], data, size);
+ return;
+ }
+
+ memcpy(&ctx->buffer[used], data, free);
+ data = (u_char *) data + free;
+ size -= free;
+ (void) nxt_sha1_body(ctx, ctx->buffer, 64);
+ }
+
+ if (size >= 64) {
+ data = nxt_sha1_body(ctx, data, size & ~(size_t) 0x3f);
+ size &= 0x3f;
+ }
+
+ memcpy(ctx->buffer, data, size);
+}
+
+
+void
+nxt_sha1_final(u_char result[20], nxt_sha1_t *ctx)
+{
+ size_t used, free;
+
+ used = (size_t) (ctx->bytes & 0x3f);
+
+ ctx->buffer[used++] = 0x80;
+
+ free = 64 - used;
+
+ if (free < 8) {
+ nxt_memzero(&ctx->buffer[used], free);
+ (void) nxt_sha1_body(ctx, ctx->buffer, 64);
+ used = 0;
+ free = 64;
+ }
+
+ nxt_memzero(&ctx->buffer[used], free - 8);
+
+ ctx->bytes <<= 3;
+ ctx->buffer[56] = (u_char) (ctx->bytes >> 56);
+ ctx->buffer[57] = (u_char) (ctx->bytes >> 48);
+ ctx->buffer[58] = (u_char) (ctx->bytes >> 40);
+ ctx->buffer[59] = (u_char) (ctx->bytes >> 32);
+ ctx->buffer[60] = (u_char) (ctx->bytes >> 24);
+ ctx->buffer[61] = (u_char) (ctx->bytes >> 16);
+ ctx->buffer[62] = (u_char) (ctx->bytes >> 8);
+ ctx->buffer[63] = (u_char) ctx->bytes;
+
+ (void) nxt_sha1_body(ctx, ctx->buffer, 64);
+
+ result[0] = (u_char) (ctx->a >> 24);
+ result[1] = (u_char) (ctx->a >> 16);
+ result[2] = (u_char) (ctx->a >> 8);
+ result[3] = (u_char) ctx->a;
+ result[4] = (u_char) (ctx->b >> 24);
+ result[5] = (u_char) (ctx->b >> 16);
+ result[6] = (u_char) (ctx->b >> 8);
+ result[7] = (u_char) ctx->b;
+ result[8] = (u_char) (ctx->c >> 24);
+ result[9] = (u_char) (ctx->c >> 16);
+ result[10] = (u_char) (ctx->c >> 8);
+ result[11] = (u_char) ctx->c;
+ result[12] = (u_char) (ctx->d >> 24);
+ result[13] = (u_char) (ctx->d >> 16);
+ result[14] = (u_char) (ctx->d >> 8);
+ result[15] = (u_char) ctx->d;
+ result[16] = (u_char) (ctx->e >> 24);
+ result[17] = (u_char) (ctx->e >> 16);
+ result[18] = (u_char) (ctx->e >> 8);
+ result[19] = (u_char) ctx->e;
+
+ nxt_memzero(ctx, sizeof(*ctx));
+}
+
+
+/*
+ * Helper functions.
+ */
+
+#define ROTATE(bits, word) (((word) << (bits)) | ((word) >> (32 - (bits))))
+
+#define F1(b, c, d) (((b) & (c)) | ((~(b)) & (d)))
+#define F2(b, c, d) ((b) ^ (c) ^ (d))
+#define F3(b, c, d) (((b) & (c)) | ((b) & (d)) | ((c) & (d)))
+
+#define STEP(f, a, b, c, d, e, w, t) \
+ temp = ROTATE(5, (a)) + f((b), (c), (d)) + (e) + (w) + (t); \
+ (e) = (d); \
+ (d) = (c); \
+ (c) = ROTATE(30, (b)); \
+ (b) = (a); \
+ (a) = temp;
+
+
+/*
+ * GET() reads 4 input bytes in big-endian byte order and returns
+ * them as uint32_t.
+ */
+
+#define GET(n) \
+ ( ((uint32_t) p[n * 4 + 3]) \
+ | ((uint32_t) p[n * 4 + 2] << 8) \
+ | ((uint32_t) p[n * 4 + 1] << 16) \
+ | ((uint32_t) p[n * 4] << 24))
+
+
+/*
+ * This processes one or more 64-byte data blocks, but does not update
+ * the bit counters. There are no alignment requirements.
+ */
+
+static const u_char *
+nxt_sha1_body(nxt_sha1_t *ctx, const u_char *data, size_t size)
+{
+ uint32_t a, b, c, d, e, temp;
+ uint32_t saved_a, saved_b, saved_c, saved_d, saved_e;
+ uint32_t words[80];
+ nxt_uint_t i;
+ const u_char *p;
+
+ p = data;
+
+ a = ctx->a;
+ b = ctx->b;
+ c = ctx->c;
+ d = ctx->d;
+ e = ctx->e;
+
+ do {
+ saved_a = a;
+ saved_b = b;
+ saved_c = c;
+ saved_d = d;
+ saved_e = e;
+
+ /* Load data block into the words array */
+
+ for (i = 0; i < 16; i++) {
+ words[i] = GET(i);
+ }
+
+ for (i = 16; i < 80; i++) {
+ words[i] = ROTATE(1, words[i - 3]
+ ^ words[i - 8]
+ ^ words[i - 14]
+ ^ words[i - 16]);
+ }
+
+ /* Transformations */
+
+ STEP(F1, a, b, c, d, e, words[0], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[1], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[2], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[3], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[4], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[5], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[6], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[7], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[8], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[9], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[10], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[11], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[12], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[13], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[14], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[15], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[16], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[17], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[18], 0x5a827999);
+ STEP(F1, a, b, c, d, e, words[19], 0x5a827999);
+
+ STEP(F2, a, b, c, d, e, words[20], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[21], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[22], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[23], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[24], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[25], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[26], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[27], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[28], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[29], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[30], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[31], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[32], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[33], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[34], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[35], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[36], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[37], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[38], 0x6ed9eba1);
+ STEP(F2, a, b, c, d, e, words[39], 0x6ed9eba1);
+
+ STEP(F3, a, b, c, d, e, words[40], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[41], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[42], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[43], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[44], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[45], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[46], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[47], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[48], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[49], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[50], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[51], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[52], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[53], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[54], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[55], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[56], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[57], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[58], 0x8f1bbcdc);
+ STEP(F3, a, b, c, d, e, words[59], 0x8f1bbcdc);
+
+ STEP(F2, a, b, c, d, e, words[60], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[61], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[62], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[63], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[64], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[65], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[66], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[67], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[68], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[69], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[70], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[71], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[72], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[73], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[74], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[75], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[76], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[77], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[78], 0xca62c1d6);
+ STEP(F2, a, b, c, d, e, words[79], 0xca62c1d6);
+
+ a += saved_a;
+ b += saved_b;
+ c += saved_c;
+ d += saved_d;
+ e += saved_e;
+
+ p += 64;
+
+ } while (size -= 64);
+
+ ctx->a = a;
+ ctx->b = b;
+ ctx->c = c;
+ ctx->d = d;
+ ctx->e = e;
+
+ return p;
+}
diff --git a/src/nxt_sha1.h b/src/nxt_sha1.h
new file mode 100644
index 00000000..2816982b
--- /dev/null
+++ b/src/nxt_sha1.h
@@ -0,0 +1,24 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#ifndef _NXT_SHA1_H_INCLUDED_
+#define _NXT_SHA1_H_INCLUDED_
+
+
+typedef struct {
+ uint64_t bytes;
+ uint32_t a, b, c, d, e;
+ u_char buffer[64];
+} nxt_sha1_t;
+
+
+NXT_EXPORT void nxt_sha1_init(nxt_sha1_t *ctx);
+NXT_EXPORT void nxt_sha1_update(nxt_sha1_t *ctx, const void *data, size_t size);
+NXT_EXPORT void nxt_sha1_final(u_char result[20], nxt_sha1_t *ctx);
+
+
+#endif /* _NXT_SHA1_H_INCLUDED_ */
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 88c7fa6a..28a0de20 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -11,38 +11,60 @@
#include "nxt_unit.h"
#include "nxt_unit_request.h"
#include "nxt_unit_response.h"
+#include "nxt_unit_websocket.h"
+
+#include "nxt_websocket.h"
#if (NXT_HAVE_MEMFD_CREATE)
#include <linux/memfd.h>
#endif
-typedef struct nxt_unit_impl_s nxt_unit_impl_t;
-typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
-typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
-typedef struct nxt_unit_process_s nxt_unit_process_t;
-typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
-typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
-typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
-typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
-typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
+typedef struct nxt_unit_impl_s nxt_unit_impl_t;
+typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
+typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
+typedef struct nxt_unit_process_s nxt_unit_process_t;
+typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
+typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
+typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
+typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
+typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
+typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
+nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
+ nxt_unit_mmap_buf_t *mmap_buf);
+nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
+ nxt_unit_mmap_buf_t *mmap_buf);
+nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
uint32_t stream);
+static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
+static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
+ nxt_unit_ctx_t *ctx);
+static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
+static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last);
+static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
+static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
+ size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
nxt_chunk_id_t *c, int n);
@@ -65,7 +87,7 @@ static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf);
+ nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
uint32_t size);
@@ -98,14 +120,22 @@ static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
nxt_unit_port_id_t *port_id, int remove);
+static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
+ nxt_unit_request_info_impl_t *req_impl);
+static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
+ nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
+
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
struct nxt_unit_mmap_buf_s {
nxt_unit_buf_t buf;
+ nxt_unit_mmap_buf_t *next;
+ nxt_unit_mmap_buf_t **prev;
+
nxt_port_mmap_header_t *hdr;
- nxt_queue_link_t link;
+// nxt_queue_link_t link;
nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
@@ -113,12 +143,20 @@ struct nxt_unit_mmap_buf_s {
struct nxt_unit_recv_msg_s {
- nxt_port_msg_t port_msg;
+ uint32_t stream;
+ nxt_pid_t pid;
+ nxt_port_id_t reply_port;
+
+ uint8_t last; /* 1 bit */
+ uint8_t mmap; /* 1 bit */
void *start;
uint32_t size;
+ int fd;
nxt_unit_process_t *process;
+
+ nxt_unit_mmap_buf_t *incoming_buf;
};
@@ -127,18 +165,22 @@ typedef enum {
NXT_UNIT_RS_RESPONSE_INIT,
NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
NXT_UNIT_RS_RESPONSE_SENT,
- NXT_UNIT_RS_DONE,
+ NXT_UNIT_RS_RELEASED,
} nxt_unit_req_state_t;
struct nxt_unit_request_info_impl_s {
nxt_unit_request_info_t req;
- nxt_unit_recv_msg_t recv_msg;
- nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */
- nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */
+ uint32_t stream;
+
+ nxt_unit_process_t *process;
+
+ nxt_unit_mmap_buf_t *outgoing_buf;
+ nxt_unit_mmap_buf_t *incoming_buf;
nxt_unit_req_state_t state;
+ uint8_t websocket;
nxt_queue_link_t link;
@@ -146,6 +188,19 @@ struct nxt_unit_request_info_impl_s {
};
+struct nxt_unit_websocket_frame_impl_s {
+ nxt_unit_websocket_frame_t ws;
+
+ nxt_unit_mmap_buf_t *buf;
+
+ nxt_queue_link_t link;
+
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ void *retain_buf;
+};
+
+
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
@@ -154,14 +209,20 @@ struct nxt_unit_ctx_impl_s {
nxt_queue_link_t link;
- nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */
+ nxt_unit_mmap_buf_t *free_buf;
/* of nxt_unit_request_info_impl_t */
nxt_queue_t free_req;
+ /* of nxt_unit_websocket_frame_impl_t */
+ nxt_queue_t free_ws;
+
/* of nxt_unit_request_info_impl_t */
nxt_queue_t active_req;
+ /* of nxt_unit_request_info_impl_t */
+ nxt_lvlhsh_t requests;
+
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_request_info_impl_t req;
@@ -394,18 +455,65 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
- nxt_queue_init(&ctx_impl->free_buf);
nxt_queue_init(&ctx_impl->free_req);
+ nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
- nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link);
- nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link);
+ ctx_impl->free_buf = NULL;
+ nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
+ nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
+
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
ctx_impl->read_port_fd = -1;
+ ctx_impl->requests.slot = 0;
+}
+
+
+nxt_inline void
+nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
+ nxt_unit_mmap_buf_t *mmap_buf)
+{
+ mmap_buf->next = *head;
+
+ if (mmap_buf->next != NULL) {
+ mmap_buf->next->prev = &mmap_buf->next;
+ }
+
+ *head = mmap_buf;
+ mmap_buf->prev = head;
+}
+
+
+nxt_inline void
+nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
+ nxt_unit_mmap_buf_t *mmap_buf)
+{
+ while (*prev != NULL) {
+ prev = &(*prev)->next;
+ }
+
+ nxt_unit_mmap_buf_insert(prev, mmap_buf);
+}
+
+
+nxt_inline void
+nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
+{
+ nxt_unit_mmap_buf_t **prev;
+
+ prev = mmap_buf->prev;
+
+ if (mmap_buf->next != NULL) {
+ mmap_buf->next->prev = prev;
+ }
+
+ if (prev != NULL) {
+ *prev = mmap_buf->next;
+ }
}
@@ -509,26 +617,18 @@ int
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
- int fd, rc, nb;
- pid_t pid;
- nxt_queue_t incoming_buf;
- struct cmsghdr *cm;
- nxt_port_msg_t *port_msg;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t new_port;
- nxt_queue_link_t *lnk;
- nxt_unit_request_t *r;
- nxt_unit_mmap_buf_t *b;
- nxt_unit_recv_msg_t recv_msg;
- nxt_unit_callbacks_t *cb;
- nxt_port_msg_new_port_t *new_port_msg;
- nxt_unit_request_info_t *req;
- nxt_unit_request_info_impl_t *req_impl;
+ int rc;
+ pid_t pid;
+ struct cmsghdr *cm;
+ nxt_port_msg_t *port_msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_recv_msg_t recv_msg;
+ nxt_unit_callbacks_t *cb;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_ERROR;
- fd = -1;
+ recv_msg.fd = -1;
recv_msg.process = NULL;
port_msg = buf;
cm = oob;
@@ -538,17 +638,22 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
&& cm->cmsg_level == SOL_SOCKET
&& cm->cmsg_type == SCM_RIGHTS)
{
- memcpy(&fd, CMSG_DATA(cm), sizeof(int));
+ memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
}
- nxt_queue_init(&incoming_buf);
+ recv_msg.incoming_buf = NULL;
if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
goto fail;
}
- recv_msg.port_msg = *port_msg;
+ recv_msg.stream = port_msg->stream;
+ recv_msg.pid = port_msg->pid;
+ recv_msg.reply_port = port_msg->reply_port;
+ recv_msg.last = port_msg->last;
+ recv_msg.mmap = port_msg->mmap;
+
recv_msg.start = port_msg + 1;
recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
@@ -572,7 +677,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
}
if (port_msg->mmap) {
- if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) {
+ if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
goto fail;
}
}
@@ -589,187 +694,326 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
break;
case _NXT_PORT_MSG_NEW_PORT:
- if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) {
- nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
- "invalid message size (%d)",
- port_msg->stream, (int) recv_msg.size);
+ rc = nxt_unit_process_new_port(ctx, &recv_msg);
+ break;
- goto fail;
- }
+ case _NXT_PORT_MSG_CHANGE_FILE:
+ nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
+ port_msg->stream, recv_msg.fd);
+ break;
- if (nxt_slow_path(fd < 0)) {
- nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
- port_msg->stream, fd);
+ case _NXT_PORT_MSG_MMAP:
+ if (nxt_slow_path(recv_msg.fd < 0)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
+ port_msg->stream, recv_msg.fd);
goto fail;
}
- new_port_msg = recv_msg.start;
+ rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
+ break;
- nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
- port_msg->stream, (int) new_port_msg->pid,
- (int) new_port_msg->id, fd);
+ case _NXT_PORT_MSG_REQ_HEADERS:
+ rc = nxt_unit_process_req_headers(ctx, &recv_msg);
+ break;
- nb = 0;
+ case _NXT_PORT_MSG_WEBSOCKET:
+ rc = nxt_unit_process_websocket(ctx, &recv_msg);
+ break;
- if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
- nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
- "failed: %s (%d)", fd, strerror(errno), errno);
+ case _NXT_PORT_MSG_REMOVE_PID:
+ if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
+ "(%d != %d)", port_msg->stream, (int) recv_msg.size,
+ (int) sizeof(pid));
goto fail;
}
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
+ memcpy(&pid, recv_msg.start, sizeof(pid));
- new_port.in_fd = -1;
- new_port.out_fd = fd;
- new_port.data = NULL;
+ nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
+ port_msg->stream, (int) pid);
- fd = -1;
+ cb->remove_pid(ctx, pid);
- rc = cb->add_port(ctx, &new_port);
+ rc = NXT_UNIT_OK;
break;
- case _NXT_PORT_MSG_CHANGE_FILE:
- nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
- port_msg->stream, fd);
- break;
+ default:
+ nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
+ port_msg->stream, (int) port_msg->type);
- case _NXT_PORT_MSG_MMAP:
- if (nxt_slow_path(fd < 0)) {
- nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
- port_msg->stream, fd);
+ goto fail;
+ }
- goto fail;
- }
+fail:
- rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd);
- break;
+ if (recv_msg.fd != -1) {
+ close(recv_msg.fd);
+ }
- case _NXT_PORT_MSG_DATA:
- if (nxt_slow_path(port_msg->mmap == 0)) {
- nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
- port_msg->stream);
+ while (recv_msg.incoming_buf != NULL) {
+ nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
+ }
- goto fail;
- }
+ if (recv_msg.process != NULL) {
+ nxt_unit_process_use(ctx, recv_msg.process, -1);
+ }
- if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) {
- nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
- "%d expected", port_msg->stream, (int) recv_msg.size,
- (int) sizeof(nxt_unit_request_t));
+ return rc;
+}
- goto fail;
- }
- req_impl = nxt_unit_request_info_get(ctx);
- if (nxt_slow_path(req_impl == NULL)) {
- nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
- port_msg->stream);
+static int
+nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ int nb;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t new_port;
+ nxt_port_msg_new_port_t *new_port_msg;
- goto fail;
- }
+ if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
+ "invalid message size (%d)",
+ recv_msg->stream, (int) recv_msg->size);
- req = &req_impl->req;
+ return NXT_UNIT_ERROR;
+ }
- req->request_port = *port_id;
+ if (nxt_slow_path(recv_msg->fd < 0)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
+ recv_msg->stream, recv_msg->fd);
- nxt_unit_port_id_init(&req->response_port, port_msg->pid,
- port_msg->reply_port);
+ return NXT_UNIT_ERROR;
+ }
- req->request = recv_msg.start;
+ new_port_msg = recv_msg->start;
- lnk = nxt_queue_first(&incoming_buf);
- b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
+ nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
+ recv_msg->stream, (int) new_port_msg->pid,
+ (int) new_port_msg->id, recv_msg->fd);
- req->request_buf = &b->buf;
- req->response = NULL;
- req->response_buf = NULL;
+ nb = 0;
- r = req->request;
+ if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
+ "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);
- req->content_length = r->content_length;
+ return NXT_UNIT_ERROR;
+ }
- req->content_buf = req->request_buf;
- req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
+ new_port_msg->id);
- /* Move process to req_impl. */
- req_impl->recv_msg = recv_msg;
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd;
+ new_port.data = NULL;
- recv_msg.process = NULL;
+ recv_msg->fd = -1;
- nxt_queue_init(&req_impl->outgoing_buf);
- nxt_queue_init(&req_impl->incoming_buf);
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
- {
- b->req = req;
- } nxt_queue_loop;
+ return lib->callbacks.add_port(ctx, &new_port);
+}
- nxt_queue_add(&req_impl->incoming_buf, &incoming_buf);
- nxt_queue_init(&incoming_buf);
- req->response_max_fields = 0;
- req_impl->state = NXT_UNIT_RS_START;
+static int
+nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_request_t *r;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
- nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream,
- (int) r->method_length, nxt_unit_sptr_get(&r->method),
- (int) r->target_length, nxt_unit_sptr_get(&r->target),
- (int) r->content_length);
+ if (nxt_slow_path(recv_msg->mmap == 0)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
+ recv_msg->stream);
- cb->request_handler(req);
+ return NXT_UNIT_ERROR;
+ }
- rc = NXT_UNIT_OK;
- break;
+ if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
+ "%d expected", recv_msg->stream, (int) recv_msg->size,
+ (int) sizeof(nxt_unit_request_t));
- case _NXT_PORT_MSG_REMOVE_PID:
- if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
- nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
- "(%d != %d)", port_msg->stream, (int) recv_msg.size,
- (int) sizeof(pid));
+ return NXT_UNIT_ERROR;
+ }
- goto fail;
- }
+ req_impl = nxt_unit_request_info_get(ctx);
+ if (nxt_slow_path(req_impl == NULL)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
+ recv_msg->stream);
- memcpy(&pid, recv_msg.start, sizeof(pid));
+ return NXT_UNIT_ERROR;
+ }
- nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
- port_msg->stream, (int) pid);
+ req = &req_impl->req;
- cb->remove_pid(ctx, pid);
+ nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
+ recv_msg->reply_port);
- rc = NXT_UNIT_OK;
- break;
+ req->request = recv_msg->start;
- default:
- nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
- port_msg->stream, (int) port_msg->type);
+ b = recv_msg->incoming_buf;
- goto fail;
+ req->request_buf = &b->buf;
+ req->response = NULL;
+ req->response_buf = NULL;
+
+ r = req->request;
+
+ req->content_length = r->content_length;
+
+ req->content_buf = req->request_buf;
+ req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
+
+ /* "Move" process reference to req_impl. */
+ req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
+ if (nxt_slow_path(req_impl->process == NULL)) {
+ return NXT_UNIT_ERROR;
}
-fail:
+ recv_msg->process = NULL;
- if (fd != -1) {
- close(fd);
+ req_impl->stream = recv_msg->stream;
+
+ req_impl->outgoing_buf = NULL;
+
+ for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
+ b->req = req;
}
- if (port_msg->mmap) {
- nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
- {
- nxt_unit_mmap_release(b->hdr, b->buf.start,
- b->buf.end - b->buf.start);
+ /* "Move" incoming buffer list to req_impl. */
+ req_impl->incoming_buf = recv_msg->incoming_buf;
+ req_impl->incoming_buf->prev = &req_impl->incoming_buf;
+ recv_msg->incoming_buf = NULL;
+
+ req->response_max_fields = 0;
+ req_impl->state = NXT_UNIT_RS_START;
+ req_impl->websocket = 0;
+
+ nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
+ (int) r->method_length, nxt_unit_sptr_get(&r->method),
+ (int) r->target_length, nxt_unit_sptr_get(&r->target),
+ (int) r->content_length);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(req);
- nxt_unit_mmap_buf_release(b);
- } nxt_queue_loop;
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ size_t hsize;
+ nxt_unit_impl_t *lib;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_callbacks_t *cb;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
+ recv_msg->last);
+ if (req_impl == NULL) {
+ return NXT_UNIT_OK;
}
- if (recv_msg.process != NULL) {
- nxt_unit_process_use(ctx, recv_msg.process, -1);
+ req = &req_impl->req;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ cb = &lib->callbacks;
+
+ if (cb->websocket_handler && recv_msg->size >= 2) {
+ ws_impl = nxt_unit_websocket_frame_get(ctx);
+ if (nxt_slow_path(ws_impl == NULL)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
+ req_impl->stream);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ ws_impl->ws.req = req;
+
+ ws_impl->buf = NULL;
+ ws_impl->retain_buf = NULL;
+
+ if (recv_msg->mmap) {
+ for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
+ b->req = req;
+ }
+
+ /* "Move" incoming buffer list to ws_impl. */
+ ws_impl->buf = recv_msg->incoming_buf;
+ ws_impl->buf->prev = &ws_impl->buf;
+ recv_msg->incoming_buf = NULL;
+
+ b = ws_impl->buf;
+
+ } else {
+ b = nxt_unit_mmap_buf_get(ctx);
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ b->hdr = NULL;
+ b->req = req;
+ b->buf.start = recv_msg->start;
+ b->buf.free = b->buf.start;
+ b->buf.end = b->buf.start + recv_msg->size;
+
+ nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
+ }
+
+ ws_impl->ws.header = (void *) b->buf.start;
+ ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
+ ws_impl->ws.header);
+
+ hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
+
+ if (ws_impl->ws.header->mask) {
+ ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
+
+ } else {
+ ws_impl->ws.mask = NULL;
+ }
+
+ b->buf.free += hsize;
+
+ ws_impl->ws.content_buf = &b->buf;
+ ws_impl->ws.content_length = ws_impl->ws.payload_len;
+
+ nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
+ "payload_len=%"PRIu64,
+ ws_impl->ws.header->opcode,
+ ws_impl->ws.payload_len);
+
+ cb->websocket_handler(&ws_impl->ws);
}
- return rc;
+ if (recv_msg->last) {
+ req_impl->websocket = 0;
+
+ if (cb->close_handler) {
+ nxt_unit_req_debug(req, "close_handler");
+
+ cb->close_handler(req);
+
+ } else {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ }
+ }
+
+ return NXT_UNIT_OK;
}
@@ -815,9 +1059,7 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
static void
nxt_unit_request_info_release(nxt_unit_request_info_t *req)
{
- nxt_unit_mmap_buf_t *b;
nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_recv_msg_t *recv_msg;
nxt_unit_request_info_impl_t *req_impl;
ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
@@ -826,30 +1068,31 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->response = NULL;
req->response_buf = NULL;
- recv_msg = &req_impl->recv_msg;
+ if (req_impl->process != NULL) {
+ nxt_unit_process_use(req->ctx, req_impl->process, -1);
- if (recv_msg->process != NULL) {
- nxt_unit_process_use(req->ctx, recv_msg->process, -1);
-
- recv_msg->process = NULL;
+ req_impl->process = NULL;
}
- nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) {
-
- nxt_unit_buf_free(&b->buf);
+ if (req_impl->websocket) {
+ nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
- } nxt_queue_loop;
-
- nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) {
+ req_impl->websocket = 0;
+ }
- nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start);
- nxt_unit_mmap_buf_release(b);
+ while (req_impl->outgoing_buf != NULL) {
+ nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
+ }
- } nxt_queue_loop;
+ while (req_impl->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_free(req_impl->incoming_buf);
+ }
nxt_queue_remove(&req_impl->link);
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
+
+ req_impl->state = NXT_UNIT_RS_RELEASED;
}
@@ -868,6 +1111,68 @@ nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
}
+static nxt_unit_websocket_frame_impl_t *
+nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
+{
+ nxt_queue_link_t *lnk;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
+ ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
+ if (nxt_slow_path(ws_impl == NULL)) {
+ nxt_unit_warn(ctx, "websocket frame allocation failed");
+
+ return NULL;
+ }
+
+ } else {
+ lnk = nxt_queue_first(&ctx_impl->free_ws);
+ nxt_queue_remove(lnk);
+
+ ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
+ }
+
+ ws_impl->ctx_impl = ctx_impl;
+
+ return ws_impl;
+}
+
+
+static void
+nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
+{
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
+
+ while (ws_impl->buf != NULL) {
+ nxt_unit_mmap_buf_free(ws_impl->buf);
+ }
+
+ ws->req = NULL;
+
+ if (ws_impl->retain_buf != NULL) {
+ free(ws_impl->retain_buf);
+
+ ws_impl->retain_buf = NULL;
+ }
+
+ nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
+}
+
+
+static void
+nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
+{
+ nxt_queue_remove(&ws_impl->link);
+
+ free(ws_impl);
+}
+
+
uint16_t
nxt_unit_field_hash(const char *name, size_t name_length)
{
@@ -1275,6 +1580,10 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
return NXT_UNIT_ERROR;
}
+ if (req->request->websocket_handshake && req->response->status == 101) {
+ nxt_unit_response_upgrade(req);
+ }
+
nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
req->response->fields_count,
(int) (req->response_buf->free
@@ -1282,9 +1591,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
if (nxt_fast_path(rc == NXT_UNIT_OK)) {
req->response = NULL;
req->response_buf = NULL;
@@ -1312,7 +1619,6 @@ nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
{
int rc;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -1327,11 +1633,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NULL;
- }
-
mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
if (nxt_slow_path(mmap_buf == NULL)) {
return NULL;
@@ -1339,10 +1640,10 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
mmap_buf->req = req;
- nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link);
+ nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
- rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
- size, mmap_buf);
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port, size, mmap_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1366,13 +1667,13 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
pthread_mutex_lock(&lib->mutex);
- recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0);
+ recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
pthread_mutex_unlock(&lib->mutex);
if (recv_msg->process == NULL) {
nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
- recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid);
+ recv_msg->stream, (int) recv_msg->pid);
}
return recv_msg->process;
@@ -1382,23 +1683,21 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
- nxt_queue_link_t *lnk;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- if (nxt_queue_is_empty(&ctx_impl->free_buf)) {
+ if (ctx_impl->free_buf == NULL) {
mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
if (nxt_slow_path(mmap_buf == NULL)) {
nxt_unit_warn(ctx, "failed to allocate buf");
}
} else {
- lnk = nxt_queue_first(&ctx_impl->free_buf);
- nxt_queue_remove(lnk);
+ mmap_buf = ctx_impl->free_buf;
- mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
+ nxt_unit_mmap_buf_remove(mmap_buf);
}
mmap_buf->ctx_impl = ctx_impl;
@@ -1410,9 +1709,91 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
{
- nxt_queue_remove(&mmap_buf->link);
+ nxt_unit_mmap_buf_remove(mmap_buf);
+
+ nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
+}
+
+
+typedef struct {
+ size_t len;
+ const char *str;
+} nxt_unit_str_t;
+
+
+#define nxt_unit_str(str) { nxt_length(str), str }
+
+
+int
+nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
+{
+ return req->request->websocket_handshake;
+}
+
+
+int
+nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
+{
+ int rc;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ if (nxt_slow_path(req_impl->websocket != 0)) {
+ nxt_unit_req_debug(req, "upgrade: already upgraded");
+
+ return NXT_UNIT_OK;
+ }
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
+ nxt_unit_req_warn(req, "upgrade: response already sent");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+
+ rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ req_impl->websocket = 1;
+
+ req->response->status = 101;
+
+ return NXT_UNIT_OK;
+}
+
+
+int
+nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
+{
+ nxt_unit_request_info_impl_t *req_impl;
- nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ return req_impl->websocket;
+}
+
+
+nxt_unit_request_info_t *
+nxt_unit_get_request_info_from_data(void *data)
+{
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
+
+ return &req_impl->req;
}
@@ -1445,9 +1826,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
}
if (nxt_fast_path(buf->free > buf->start)) {
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -1472,10 +1851,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
req = mmap_buf->req;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- mmap_buf, 1);
-
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
if (nxt_slow_path(rc == NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1506,6 +1882,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
buf = &mmap_buf->buf;
+ hdr = mmap_buf->hdr;
m.mmap_msg.size = buf->free - buf->start;
@@ -1514,15 +1891,15 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.msg.reply_port = 0;
m.msg.type = _NXT_PORT_MSG_DATA;
m.msg.last = last != 0;
- m.msg.mmap = m.mmap_msg.size > 0;
+ m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
m.msg.nf = 0;
m.msg.mf = 0;
m.msg.tracking = 0;
- hdr = mmap_buf->hdr;
-
- m.mmap_msg.mmap_id = hdr->id;
- m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
+ if (hdr != NULL) {
+ m.mmap_msg.mmap_id = hdr->id;
+ m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
+ }
nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
stream,
@@ -1531,14 +1908,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
(int) m.mmap_msg.size);
res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
- m.mmap_msg.size > 0 ? sizeof(m)
- : sizeof(m.msg),
+ m.msg.mmap ? sizeof(m) : sizeof(m.msg),
NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
return NXT_UNIT_ERROR;
}
- if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
+ if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
last_used = (u_char *) buf->free - 1;
first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
@@ -1557,11 +1933,17 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
void
nxt_unit_buf_free(nxt_unit_buf_t *buf)
{
- nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
+}
- mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
- nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start);
+static void
+nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
+{
+ if (nxt_fast_path(mmap_buf->hdr != NULL)) {
+ nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
+ mmap_buf->buf.end - mmap_buf->buf.start);
+ }
nxt_unit_mmap_buf_release(mmap_buf);
}
@@ -1570,26 +1952,15 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf)
nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t *buf)
{
- nxt_queue_link_t *lnk;
- nxt_unit_mmap_buf_t *mmap_buf;
- nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_mmap_buf_t *mmap_buf;
mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
- req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t,
- req);
- lnk = &mmap_buf->link;
-
- if (lnk == nxt_queue_last(&req_impl->incoming_buf)
- || lnk == nxt_queue_last(&req_impl->outgoing_buf))
- {
+ if (mmap_buf->next == NULL) {
return NULL;
}
- lnk = nxt_queue_next(lnk);
- mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
-
- return &mmap_buf->buf;
+ return &mmap_buf->next->buf;
}
@@ -1614,7 +1985,6 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
int rc;
uint32_t part_size;
const char *part_start;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -1641,16 +2011,12 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
part_start += part_size;
}
- process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
while (size > 0) {
part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
- part_size, &mmap_buf);
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port, part_size,
+ &mmap_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -1658,9 +2024,7 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
mmap_buf.buf.end - mmap_buf.buf.start);
@@ -1766,6 +2130,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
ssize_t
nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
{
+ return nxt_unit_buf_read(&req->content_buf, &req->content_length,
+ dst, size);
+}
+
+
+static ssize_t
+nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
+{
u_char *p;
size_t rest, copy, read;
nxt_unit_buf_t *buf;
@@ -1773,7 +2145,7 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
p = dst;
rest = size;
- buf = req->content_buf;
+ buf = *b;
while (buf != NULL) {
copy = buf->end - buf->free;
@@ -1795,11 +2167,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
buf = nxt_unit_buf_next(buf);
}
- req->content_buf = buf;
+ *b = buf;
read = size - rest;
- req->content_length -= read;
+ *len -= read;
return read;
}
@@ -1852,7 +2224,7 @@ skip_response_send:
lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
- msg.stream = req_impl->recv_msg.port_msg.stream;
+ msg.stream = req_impl->stream;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
@@ -1874,6 +2246,162 @@ skip_response_send:
}
+int
+nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
+ uint8_t last, const void *start, size_t size)
+{
+ const struct iovec iov = { (void *) start, size };
+
+ return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
+}
+
+
+int
+nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
+ uint8_t last, const struct iovec *iov, int iovcnt)
+{
+ int i, rc;
+ size_t l, copy;
+ uint32_t payload_len, buf_size;
+ const uint8_t *b;
+ nxt_unit_buf_t *buf;
+ nxt_websocket_header_t *wh;
+
+ payload_len = 0;
+
+ for (i = 0; i < iovcnt; i++) {
+ payload_len += iov[i].iov_len;
+ }
+
+ buf_size = 10 + payload_len;
+
+ buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
+ PORT_MMAP_DATA_SIZE));
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_error(req, "Failed to allocate buf for content");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf->start[0] = 0;
+ buf->start[1] = 0;
+
+ wh = (void *) buf->free;
+
+ buf->free = nxt_websocket_frame_init(wh, payload_len);
+ wh->fin = last;
+ wh->opcode = opcode;
+
+ for (i = 0; i < iovcnt; i++) {
+ b = iov[i].iov_base;
+ l = iov[i].iov_len;
+
+ while (l > 0) {
+ copy = buf->end - buf->free;
+ copy = nxt_min(l, copy);
+
+ buf->free = nxt_cpymem(buf->free, b, copy);
+ b += copy;
+ l -= copy;
+
+ if (l > 0) {
+ buf_size -= buf->end - buf->start;
+
+ rc = nxt_unit_buf_send(buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(req, "Failed to send content");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
+ PORT_MMAP_DATA_SIZE));
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_error(req,
+ "Failed to allocate buf for content");
+
+ return NXT_UNIT_ERROR;
+ }
+ }
+ }
+ }
+
+ if (buf->free > buf->start) {
+ rc = nxt_unit_buf_send(buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(req, "Failed to send content");
+ }
+ }
+
+ return rc;
+}
+
+
+ssize_t
+nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
+ size_t size)
+{
+ ssize_t res;
+ uint8_t *b;
+ uint64_t i, d;
+
+ res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
+ dst, size);
+
+ if (ws->mask == NULL) {
+ return res;
+ }
+
+ b = dst;
+ d = (ws->payload_len - ws->content_length - res) % 4;
+
+ for (i = 0; i < (uint64_t) res; i++) {
+ b[i] ^= ws->mask[ (i + d) % 4 ];
+ }
+
+ return res;
+}
+
+
+int
+nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
+{
+ char *b;
+ size_t size;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
+
+ if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
+ return NXT_UNIT_OK;
+ }
+
+ size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
+
+ b = malloc(size);
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ memcpy(b, ws_impl->buf->buf.start, size);
+
+ ws_impl->buf->buf.start = b;
+ ws_impl->buf->buf.free = b;
+ ws_impl->buf->buf.end = b + size;
+
+ ws_impl->retain_buf = b;
+
+ return NXT_UNIT_OK;
+}
+
+
+void
+nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
+{
+ nxt_unit_websocket_frame_release(ws);
+}
+
+
static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
@@ -2355,7 +2883,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
- recv_msg->port_msg.stream, (int) recv_msg->size);
+ recv_msg->stream, (int) recv_msg->size);
return 0;
}
@@ -2378,18 +2906,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
"invalid mmap id %d,%"PRIu32,
- recv_msg->port_msg.stream,
- (int) process->pid, tracking_msg->mmap_id);
+ recv_msg->stream, (int) process->pid,
+ tracking_msg->mmap_id);
return 0;
}
c = tracking_msg->tracking_id;
- rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0);
+ rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
if (rc == 0) {
nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
- recv_msg->port_msg.stream);
+ recv_msg->stream);
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
}
@@ -2401,19 +2929,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static int
-nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
- nxt_queue_t *incoming_buf)
+nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
void *start;
uint32_t size;
nxt_unit_process_t *process;
- nxt_unit_mmap_buf_t *b;
+ nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
nxt_port_mmap_header_t *hdr;
if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
- recv_msg->port_msg.stream, (int) recv_msg->size);
+ recv_msg->stream, (int) recv_msg->size);
return NXT_UNIT_ERROR;
}
@@ -2426,6 +2953,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
mmap_msg = recv_msg->start;
end = nxt_pointer_to(recv_msg->start, recv_msg->size);
+ incoming_tail = &recv_msg->incoming_buf;
+
pthread_mutex_lock(&process->incoming.mutex);
for (; mmap_msg < end; mmap_msg++) {
@@ -2435,8 +2964,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
"invalid mmap id %d,%"PRIu32,
- recv_msg->port_msg.stream,
- (int) process->pid, mmap_msg->mmap_id);
+ recv_msg->stream, (int) process->pid,
+ mmap_msg->mmap_id);
return NXT_UNIT_ERROR;
}
@@ -2453,16 +2982,16 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
if (nxt_slow_path(b == NULL)) {
pthread_mutex_unlock(&process->incoming.mutex);
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
- "failed to allocate buf",
- recv_msg->port_msg.stream);
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
+ recv_msg->stream);
nxt_unit_mmap_release(hdr, start, size);
return NXT_UNIT_ERROR;
}
- nxt_queue_insert_tail(incoming_buf, &b->link);
+ nxt_unit_mmap_buf_insert(incoming_tail, b);
+ incoming_tail = &b->next;
b->buf.start = start;
b->buf.free = start;
@@ -2470,7 +2999,7 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
b->hdr = hdr;
nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
- recv_msg->port_msg.stream,
+ recv_msg->stream,
start, (int) size,
(int) hdr->src_pid, (int) hdr->dst_pid,
(int) hdr->id, (int) mmap_msg->chunk_id,
@@ -2685,6 +3214,11 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
if (nxt_fast_path(rsize > 0)) {
rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
oob, sizeof(oob));
+
+#if (NXT_DEBUG)
+ memset(buf, 0xAC, rsize);
+#endif
+
} else {
rc = NXT_UNIT_ERROR;
}
@@ -2775,10 +3309,11 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
void
nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
{
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_mmap_buf_t *mmap_buf;
- nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -2792,15 +3327,14 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
} nxt_queue_loop;
- nxt_queue_remove(&ctx_impl->ctx_buf[0].link);
- nxt_queue_remove(&ctx_impl->ctx_buf[1].link);
-
- nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) {
+ nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
+ nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);
- nxt_queue_remove(&mmap_buf->link);
+ while (ctx_impl->free_buf != NULL) {
+ mmap_buf = ctx_impl->free_buf;
+ nxt_unit_mmap_buf_remove(mmap_buf);
free(mmap_buf);
-
- } nxt_queue_loop;
+ }
nxt_queue_each(req_impl, &ctx_impl->free_req,
nxt_unit_request_info_impl_t, link)
@@ -2809,6 +3343,13 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
} nxt_queue_loop;
+ nxt_queue_each(ws_impl, &ctx_impl->free_ws,
+ nxt_unit_websocket_frame_impl_t, link)
+ {
+ nxt_unit_websocket_frame_free(ws_impl);
+
+ } nxt_queue_loop;
+
nxt_queue_remove(&ctx_impl->link);
if (ctx_impl != &lib->main_ctx) {
@@ -3454,6 +3995,83 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
}
+static nxt_int_t
+nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ return NXT_OK;
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_unit_request_hash_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+static int
+nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
+ nxt_unit_request_info_impl_t *req_impl)
+{
+ uint32_t *stream;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+
+ stream = &req_impl->stream;
+
+ lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
+ lhq.key.length = sizeof(*stream);
+ lhq.key.start = (u_char *) stream;
+ lhq.proto = &lvlhsh_requests_proto;
+ lhq.pool = NULL;
+ lhq.replace = 0;
+ lhq.value = req_impl;
+
+ res = nxt_lvlhsh_insert(request_hash, &lhq);
+
+ switch (res) {
+
+ case NXT_OK:
+ return NXT_UNIT_OK;
+
+ default:
+ return NXT_UNIT_ERROR;
+ }
+}
+
+
+static nxt_unit_request_info_impl_t *
+nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
+ int remove)
+{
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
+ lhq.key.length = sizeof(stream);
+ lhq.key.start = (u_char *) &stream;
+ lhq.proto = &lvlhsh_requests_proto;
+ lhq.pool = NULL;
+
+ if (remove) {
+ res = nxt_lvlhsh_delete(request_hash, &lhq);
+
+ } else {
+ res = nxt_lvlhsh_find(request_hash, &lhq);
+ }
+
+ switch (res) {
+
+ case NXT_OK:
+ return lhq.value;
+
+ default:
+ return NULL;
+ }
+}
+
+
void
nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
{
@@ -3526,8 +4144,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
if (nxt_fast_path(req != NULL)) {
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- p += snprintf(p, end - p,
- "#%"PRIu32": ", req_impl->recv_msg.port_msg.stream);
+ p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
}
va_start(ap, fmt);
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 532de20d..3471a758 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -9,6 +9,7 @@
#include <inttypes.h>
#include <sys/types.h>
+#include <sys/uio.h>
#include <string.h>
#include "nxt_version.h"
@@ -106,17 +107,24 @@ struct nxt_unit_request_info_s {
void *data;
};
+
/*
* Set of application-specific callbacks. Application may leave all optional
* callbacks as NULL.
*/
struct nxt_unit_callbacks_s {
/*
- * Process request data. Unlike all other callback, this callback
+ * Process request. Unlike all other callback, this callback
* need to be defined by application.
*/
void (*request_handler)(nxt_unit_request_info_t *req);
+ /* Process websocket frame. */
+ void (*websocket_handler)(nxt_unit_websocket_frame_t *ws);
+
+ /* Connection closed. */
+ void (*close_handler)(nxt_unit_request_info_t *req);
+
/* Add new Unit port to communicate with process pid. Optional. */
int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
@@ -293,6 +301,14 @@ int nxt_unit_response_is_sent(nxt_unit_request_info_t *req);
nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req,
uint32_t size);
+int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req);
+
+int nxt_unit_response_upgrade(nxt_unit_request_info_t *req);
+
+int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req);
+
+nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data);
+
int nxt_unit_buf_send(nxt_unit_buf_t *buf);
void nxt_unit_buf_free(nxt_unit_buf_t *buf);
@@ -315,6 +331,20 @@ ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst,
void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc);
+int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
+ uint8_t last, const void *start, size_t size);
+
+int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
+ uint8_t last, const struct iovec *iov, int iovcnt);
+
+ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
+ size_t size);
+
+int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws);
+
+void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws);
+
+
void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...);
void nxt_unit_req_log(nxt_unit_request_info_t *req, int level,
diff --git a/src/nxt_unit_request.h b/src/nxt_unit_request.h
index 2207cefa..52017a42 100644
--- a/src/nxt_unit_request.h
+++ b/src/nxt_unit_request.h
@@ -20,6 +20,7 @@ struct nxt_unit_request_s {
uint8_t remote_length;
uint8_t local_length;
uint8_t tls;
+ uint8_t websocket_handshake;
uint32_t server_name_length;
uint32_t target_length;
uint32_t path_length;
diff --git a/src/nxt_unit_typedefs.h b/src/nxt_unit_typedefs.h
index 871ce25b..26e54f91 100644
--- a/src/nxt_unit_typedefs.h
+++ b/src/nxt_unit_typedefs.h
@@ -7,19 +7,20 @@
#define _NXT_UNIT_TYPEDEFS_H_INCLUDED_
-typedef struct nxt_unit_s nxt_unit_t;
-typedef struct nxt_unit_ctx_s nxt_unit_ctx_t;
-typedef struct nxt_unit_port_id_s nxt_unit_port_id_t;
-typedef struct nxt_unit_port_s nxt_unit_port_t;
-typedef struct nxt_unit_buf_s nxt_unit_buf_t;
-typedef struct nxt_unit_request_info_s nxt_unit_request_info_t;
-typedef struct nxt_unit_callbacks_s nxt_unit_callbacks_t;
-typedef struct nxt_unit_init_s nxt_unit_init_t;
-typedef union nxt_unit_sptr_u nxt_unit_sptr_t;
-typedef struct nxt_unit_field_s nxt_unit_field_t;
-typedef struct nxt_unit_request_s nxt_unit_request_t;
-typedef struct nxt_unit_response_s nxt_unit_response_t;
-typedef struct nxt_unit_read_info_s nxt_unit_read_info_t;
+typedef struct nxt_unit_s nxt_unit_t;
+typedef struct nxt_unit_ctx_s nxt_unit_ctx_t;
+typedef struct nxt_unit_port_id_s nxt_unit_port_id_t;
+typedef struct nxt_unit_port_s nxt_unit_port_t;
+typedef struct nxt_unit_buf_s nxt_unit_buf_t;
+typedef struct nxt_unit_request_info_s nxt_unit_request_info_t;
+typedef struct nxt_unit_callbacks_s nxt_unit_callbacks_t;
+typedef struct nxt_unit_init_s nxt_unit_init_t;
+typedef union nxt_unit_sptr_u nxt_unit_sptr_t;
+typedef struct nxt_unit_field_s nxt_unit_field_t;
+typedef struct nxt_unit_request_s nxt_unit_request_t;
+typedef struct nxt_unit_response_s nxt_unit_response_t;
+typedef struct nxt_unit_read_info_s nxt_unit_read_info_t;
+typedef struct nxt_unit_websocket_frame_s nxt_unit_websocket_frame_t;
#endif /* _NXT_UNIT_TYPEDEFS_H_INCLUDED_ */
diff --git a/src/nxt_unit_websocket.h b/src/nxt_unit_websocket.h
new file mode 100644
index 00000000..beb2536e
--- /dev/null
+++ b/src/nxt_unit_websocket.h
@@ -0,0 +1,27 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_UNIT_WEBSOCKET_H_INCLUDED_
+#define _NXT_UNIT_WEBSOCKET_H_INCLUDED_
+
+#include <inttypes.h>
+
+#include "nxt_unit_typedefs.h"
+#include "nxt_websocket_header.h"
+
+
+struct nxt_unit_websocket_frame_s {
+ nxt_unit_request_info_t *req;
+
+ uint64_t payload_len;
+ nxt_websocket_header_t *header;
+ uint8_t *mask;
+
+ nxt_unit_buf_t *content_buf;
+ uint64_t content_length;
+};
+
+
+#endif /* _NXT_UNIT_WEBSOCKET_H_INCLUDED_ */
diff --git a/src/nxt_websocket.c b/src/nxt_websocket.c
new file mode 100644
index 00000000..9a099760
--- /dev/null
+++ b/src/nxt_websocket.c
@@ -0,0 +1,122 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_websocket.h>
+#include <nxt_websocket_header.h>
+
+
+nxt_inline uint16_t
+nxt_ntoh16(const uint8_t *b)
+{
+ return ((uint16_t) b[0]) << 8 | ((uint16_t) b[1]);
+}
+
+
+nxt_inline void
+nxt_hton16(uint8_t *b, uint16_t v)
+{
+ b[0] = (v >> 8);
+ b[1] = (v & 0xFFu);
+}
+
+
+nxt_inline uint64_t
+nxt_ntoh64(const uint8_t *b)
+{
+ return ((uint64_t) b[0]) << 56
+ | ((uint64_t) b[1]) << 48
+ | ((uint64_t) b[2]) << 40
+ | ((uint64_t) b[3]) << 32
+ | ((uint64_t) b[4]) << 24
+ | ((uint64_t) b[5]) << 16
+ | ((uint64_t) b[6]) << 8
+ | ((uint64_t) b[7]);
+}
+
+
+nxt_inline void
+nxt_hton64(uint8_t *b, uint64_t v)
+{
+ b[0] = (v >> 56);
+ b[1] = (v >> 48) & 0xFFu;
+ b[2] = (v >> 40) & 0xFFu;
+ b[3] = (v >> 32) & 0xFFu;
+ b[4] = (v >> 24) & 0xFFu;
+ b[5] = (v >> 16) & 0xFFu;
+ b[6] = (v >> 8) & 0xFFu;
+ b[7] = v & 0xFFu;
+}
+
+
+size_t
+nxt_websocket_frame_header_size(const void *data)
+{
+ size_t res;
+ uint64_t p;
+ const nxt_websocket_header_t *h;
+
+ h = data;
+ p = h->payload_len;
+
+ res = 2;
+
+ if (p == 126) {
+ res += 2;
+ } else if (p == 127) {
+ res += 8;
+ }
+
+ if (h->mask) {
+ res += 4;
+ }
+
+ return res;
+}
+
+
+uint64_t
+nxt_websocket_frame_payload_len(const void *data)
+{
+ uint64_t p;
+ const nxt_websocket_header_t *h;
+
+ h = data;
+ p = h->payload_len;
+
+ if (p == 126) {
+ p = nxt_ntoh16(h->payload_len_);
+ } else if (p == 127) {
+ p = nxt_ntoh64(h->payload_len_);
+ }
+
+ return p;
+}
+
+
+void *
+nxt_websocket_frame_init(void *data, uint64_t payload_len)
+{
+ uint8_t *p;
+ nxt_websocket_header_t *h;
+
+ h = data;
+ p = data;
+
+ if (payload_len < 126) {
+ h->payload_len = payload_len;
+ return p + 2;
+ }
+
+ if (payload_len < 65536) {
+ h->payload_len = 126;
+ nxt_hton16(h->payload_len_, payload_len);
+ return p + 4;
+ }
+
+ h->payload_len = 127;
+ nxt_hton64(h->payload_len_, payload_len);
+ return p + 10;
+}
diff --git a/src/nxt_websocket.h b/src/nxt_websocket.h
new file mode 100644
index 00000000..499a3268
--- /dev/null
+++ b/src/nxt_websocket.h
@@ -0,0 +1,21 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_WEBSOCKET_H_INCLUDED_
+#define _NXT_WEBSOCKET_H_INCLUDED_
+
+
+enum {
+ NXT_WEBSOCKET_ACCEPT_SIZE = 28,
+};
+
+
+NXT_EXPORT size_t nxt_websocket_frame_header_size(const void *data);
+NXT_EXPORT uint64_t nxt_websocket_frame_payload_len(const void *data);
+NXT_EXPORT void *nxt_websocket_frame_init(void *data, uint64_t payload_len);
+NXT_EXPORT void nxt_websocket_accept(u_char *accept, const void *key);
+
+
+#endif /* _NXT_WEBSOCKET_H_INCLUDED_ */
diff --git a/src/nxt_websocket_accept.c b/src/nxt_websocket_accept.c
new file mode 100644
index 00000000..05cbcb56
--- /dev/null
+++ b/src/nxt_websocket_accept.c
@@ -0,0 +1,68 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_websocket.h>
+#include <nxt_sha1.h>
+
+
+static void
+nxt_websocket_base64_encode(u_char *d, const uint8_t *s, size_t len)
+{
+ u_char c0, c1, c2;
+ static u_char basis[] =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+
+ while (len > 2) {
+ c0 = s[0];
+ c1 = s[1];
+ c2 = s[2];
+
+ *d++ = basis[c0 >> 2];
+ *d++ = basis[((c0 & 0x03) << 4) | (c1 >> 4)];
+ *d++ = basis[((c1 & 0x0f) << 2) | (c2 >> 6)];
+ *d++ = basis[c2 & 0x3f];
+
+ s += 3;
+ len -= 3;
+ }
+
+ if (len > 0) {
+ c0 = s[0];
+ *d++ = basis[c0 >> 2];
+
+ if (len == 1) {
+ *d++ = basis[(c0 & 0x03) << 4];
+ *d++ = '=';
+ *d++ = '=';
+
+ } else {
+ c1 = s[1];
+
+ *d++ = basis[((c0 & 0x03) << 4) | (c1 >> 4)];
+ *d++ = basis[(c1 & 0x0f) << 2];
+
+ *d++ = '=';
+ }
+ }
+}
+
+
+void
+nxt_websocket_accept(u_char *accept, const void *key)
+{
+ u_char bin_accept[20];
+ nxt_sha1_t ctx;
+ static const char accept_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+
+ nxt_sha1_init(&ctx);
+ nxt_sha1_update(&ctx, key, 24);
+ nxt_sha1_update(&ctx, accept_guid, nxt_length(accept_guid));
+ nxt_sha1_final(bin_accept, &ctx);
+
+ nxt_websocket_base64_encode(accept, bin_accept, sizeof(bin_accept));
+}
+
+
diff --git a/src/nxt_websocket_header.h b/src/nxt_websocket_header.h
new file mode 100644
index 00000000..f75dfacd
--- /dev/null
+++ b/src/nxt_websocket_header.h
@@ -0,0 +1,68 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_WEBSOCKET_HEADER_H_INCLUDED_
+#define _NXT_WEBSOCKET_HEADER_H_INCLUDED_
+
+#include <netinet/in.h>
+
+
+typedef struct {
+#if (BYTE_ORDER == BIG_ENDIAN)
+ uint8_t fin:1;
+ uint8_t rsv1:1;
+ uint8_t rsv2:1;
+ uint8_t rsv3:1;
+ uint8_t opcode:4;
+
+ uint8_t mask:1;
+ uint8_t payload_len:7;
+#endif
+
+#if (BYTE_ORDER == LITTLE_ENDIAN)
+ uint8_t opcode:4;
+ uint8_t rsv3:1;
+ uint8_t rsv2:1;
+ uint8_t rsv1:1;
+ uint8_t fin:1;
+
+ uint8_t payload_len:7;
+ uint8_t mask:1;
+#endif
+
+ uint8_t payload_len_[8];
+} nxt_websocket_header_t;
+
+
+enum {
+ NXT_WEBSOCKET_OP_CONT = 0x00,
+ NXT_WEBSOCKET_OP_TEXT = 0x01,
+ NXT_WEBSOCKET_OP_BINARY = 0x02,
+ NXT_WEBSOCKET_OP_CLOSE = 0x08,
+ NXT_WEBSOCKET_OP_PING = 0x09,
+ NXT_WEBSOCKET_OP_PONG = 0x0A,
+
+ NXT_WEBSOCKET_OP_CTRL = 0x08,
+};
+
+
+enum {
+ NXT_WEBSOCKET_CR_NORMAL = 1000,
+ NXT_WEBSOCKET_CR_GOING_AWAY = 1001,
+ NXT_WEBSOCKET_CR_PROTOCOL_ERROR = 1002,
+ NXT_WEBSOCKET_CR_UNPROCESSABLE_INPUT = 1003,
+ NXT_WEBSOCKET_CR_RESERVED = 1004,
+ NXT_WEBSOCKET_CR_NOT_PROVIDED = 1005,
+ NXT_WEBSOCKET_CR_ABNORMAL = 1006,
+ NXT_WEBSOCKET_CR_INVALID_DATA = 1007,
+ NXT_WEBSOCKET_CR_POLICY_VIOLATION = 1008,
+ NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG = 1009,
+ NXT_WEBSOCKET_CR_EXTENSION_REQUIRED = 1010,
+ NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR = 1011,
+ NXT_WEBSOCKET_CR_TLS_HANDSHAKE_FAILED = 1015,
+};
+
+
+#endif /* _NXT_WEBSOCKET_HEADER_H_INCLUDED_ */
diff --git a/src/test/nxt_unit_websocket_chat.c b/src/test/nxt_unit_websocket_chat.c
new file mode 100644
index 00000000..ecc9a243
--- /dev/null
+++ b/src/test/nxt_unit_websocket_chat.c
@@ -0,0 +1,348 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <nxt_clang.h>
+#include <nxt_websocket.h>
+#include <nxt_unit_websocket.h>
+#include <nxt_main.h>
+
+
+#define CONTENT_TYPE "Content-Type"
+#define CONTENT_LENGTH "Content-Length"
+#define TEXT_HTML "text/html"
+
+typedef struct {
+ nxt_queue_link_t link;
+ int id;
+} ws_chat_request_data_t;
+
+
+static int ws_chat_root(nxt_unit_request_info_t *req);
+static void ws_chat_broadcast(const void *buf, size_t size);
+
+
+static const char ws_chat_index_html[];
+static const int ws_chat_index_html_size;
+
+static char ws_chat_index_content_length[34];
+static int ws_chat_index_content_length_size;
+
+static nxt_queue_t ws_chat_sessions;
+static int ws_chat_next_id = 0;
+
+
+static void
+ws_chat_request_handler(nxt_unit_request_info_t *req)
+{
+ static char buf[1024];
+ int buf_size;
+ int rc = NXT_UNIT_OK;
+ nxt_unit_request_t *r;
+ ws_chat_request_data_t *data;
+
+ r = req->request;
+
+ const char* target = nxt_unit_sptr_get(&r->target);
+
+ if (strcmp(target, "/") == 0) {
+ rc = ws_chat_root(req);
+ goto fail;
+ }
+
+ if (strcmp(target, "/chat") == 0) {
+ if (!nxt_unit_request_is_websocket_handshake(req)) {
+ goto notfound;
+ }
+
+ rc = nxt_unit_response_init(req, 101, 0, 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
+
+ data = req->data;
+ nxt_queue_insert_tail(&ws_chat_sessions, &data->link);
+
+ data->id = ws_chat_next_id++;
+
+ nxt_unit_response_upgrade(req);
+ nxt_unit_response_send(req);
+
+
+ buf_size = snprintf(buf, sizeof(buf), "Guest #%d has joined.", data->id);
+
+ ws_chat_broadcast(buf, buf_size);
+
+ return;
+ }
+
+notfound:
+
+ rc = nxt_unit_response_init(req, 404, 0, 0);
+
+fail:
+
+ nxt_unit_request_done(req, rc);
+}
+
+
+static int
+ws_chat_root(nxt_unit_request_info_t *req)
+{
+ int rc;
+
+ rc = nxt_unit_response_init(req, 200 /* Status code. */,
+ 2 /* Number of response headers. */,
+ nxt_length(CONTENT_TYPE) + 1
+ + nxt_length(TEXT_HTML) + 1
+ + nxt_length(CONTENT_LENGTH) + 1
+ + ws_chat_index_content_length_size + 1
+ + ws_chat_index_html_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ rc = nxt_unit_response_add_field(req,
+ CONTENT_TYPE, nxt_length(CONTENT_TYPE),
+ TEXT_HTML, nxt_length(TEXT_HTML));
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ rc = nxt_unit_response_add_field(req,
+ CONTENT_LENGTH, nxt_length(CONTENT_LENGTH),
+ ws_chat_index_content_length,
+ ws_chat_index_content_length_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ rc = nxt_unit_response_add_content(req, ws_chat_index_html,
+ ws_chat_index_html_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ return nxt_unit_response_send(req);
+}
+
+
+static void
+ws_chat_broadcast(const void *buf, size_t size)
+{
+ ws_chat_request_data_t *data;
+ nxt_unit_request_info_t *req;
+
+ nxt_unit_debug(NULL, "broadcast: %s", buf);
+
+ nxt_queue_each(data, &ws_chat_sessions, ws_chat_request_data_t, link) {
+
+ req = nxt_unit_get_request_info_from_data(data);
+
+ nxt_unit_req_debug(req, "broadcast: %s", buf);
+
+ nxt_unit_websocket_send(req, NXT_WEBSOCKET_OP_TEXT, 1, buf, size);
+ } nxt_queue_loop;
+}
+
+
+static void
+ws_chat_websocket_handler(nxt_unit_websocket_frame_t *ws)
+{
+ int buf_size;
+ static char buf[1024];
+ ws_chat_request_data_t *data;
+
+ if (ws->header->opcode != NXT_WEBSOCKET_OP_TEXT) {
+ return;
+ }
+
+ data = ws->req->data;
+
+ buf_size = snprintf(buf, sizeof(buf), "Guest #%d: ", data->id);
+
+ buf_size += nxt_unit_websocket_read(ws, buf + buf_size,
+ nxt_min(sizeof(buf),
+ ws->content_length));
+
+ ws_chat_broadcast(buf, buf_size);
+
+ nxt_unit_websocket_done(ws);
+}
+
+
+static void
+ws_chat_close_handler(nxt_unit_request_info_t *req)
+{
+ int buf_size;
+ static char buf[1024];
+ ws_chat_request_data_t *data;
+
+ data = req->data;
+ buf_size = snprintf(buf, sizeof(buf), "Guest #%d has disconnected.",
+ data->id);
+
+ nxt_queue_remove(&data->link);
+ nxt_unit_request_done(req, NXT_UNIT_OK);
+
+ ws_chat_broadcast(buf, buf_size);
+}
+
+
+int
+main()
+{
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_init_t init;
+
+ ws_chat_index_content_length_size =
+ snprintf(ws_chat_index_content_length,
+ sizeof(ws_chat_index_content_length), "%d",
+ ws_chat_index_html_size);
+
+ nxt_queue_init(&ws_chat_sessions);
+
+ memset(&init, 0, sizeof(nxt_unit_init_t));
+
+ init.callbacks.request_handler = ws_chat_request_handler;
+ init.callbacks.websocket_handler = ws_chat_websocket_handler;
+ init.callbacks.close_handler = ws_chat_close_handler;
+
+ init.request_data_size = sizeof(ws_chat_request_data_t);
+
+ ctx = nxt_unit_init(&init);
+ if (ctx == NULL) {
+ return 1;
+ }
+
+ nxt_unit_run(ctx);
+
+ nxt_unit_done(ctx);
+
+ return 0;
+}
+
+
+static const char ws_chat_index_html[] =
+"<html>\n"
+"<head>\n"
+" <title>WebSocket Chat Examples</title>\n"
+" <style type=\"text/css\">\n"
+" input#chat {\n"
+" width: 410px\n"
+" }\n"
+"\n"
+" #container {\n"
+" width: 400px;\n"
+" }\n"
+"\n"
+" #console {\n"
+" border: 1px solid #CCCCCC;\n"
+" border-right-color: #999999;\n"
+" border-bottom-color: #999999;\n"
+" height: 170px;\n"
+" overflow-y: scroll;\n"
+" padding: 5px;\n"
+" width: 100%;\n"
+" }\n"
+"\n"
+" #console p {\n"
+" padding: 0;\n"
+" margin: 0;\n"
+" }\n"
+" </style>\n"
+" <script>\n"
+" \"use strict\";\n"
+"\n"
+" var Chat = {};\n"
+"\n"
+" Chat.socket = null;\n"
+"\n"
+" Chat.connect = (function(host) {\n"
+" if ('WebSocket' in window) {\n"
+" Chat.socket = new WebSocket(host);\n"
+" } else if ('MozWebSocket' in window) {\n"
+" Chat.socket = new MozWebSocket(host);\n"
+" } else {\n"
+" Console.log('Error: WebSocket is not supported by this browser.');\n"
+" return;\n"
+" }\n"
+"\n"
+" Chat.socket.onopen = function () {\n"
+" Console.log('Info: WebSocket connection opened.');\n"
+" document.getElementById('chat').onkeydown = function(event) {\n"
+" if (event.keyCode == 13) {\n"
+" Chat.sendMessage();\n"
+" }\n"
+" };\n"
+" };\n"
+"\n"
+" Chat.socket.onclose = function () {\n"
+" document.getElementById('chat').onkeydown = null;\n"
+" Console.log('Info: WebSocket closed.');\n"
+" };\n"
+"\n"
+" Chat.socket.onmessage = function (message) {\n"
+" Console.log(message.data);\n"
+" };\n"
+" });\n"
+"\n"
+" Chat.initialize = function() {\n"
+" var proto = 'ws://';\n"
+" if (window.location.protocol == 'https:') {\n"
+" proto = 'wss://'\n"
+" }\n"
+" Chat.connect(proto + window.location.host + '/chat');\n"
+" };\n"
+"\n"
+" Chat.sendMessage = (function() {\n"
+" var message = document.getElementById('chat').value;\n"
+" if (message != '') {\n"
+" Chat.socket.send(message);\n"
+" document.getElementById('chat').value = '';\n"
+" }\n"
+" });\n"
+"\n"
+" var Console = {};\n"
+"\n"
+" Console.log = (function(message) {\n"
+" var console = document.getElementById('console');\n"
+" var p = document.createElement('p');\n"
+" p.style.wordWrap = 'break-word';\n"
+" p.innerHTML = message;\n"
+" console.appendChild(p);\n"
+" while (console.childNodes.length > 25) {\n"
+" console.removeChild(console.firstChild);\n"
+" }\n"
+" console.scrollTop = console.scrollHeight;\n"
+" });\n"
+"\n"
+" Chat.initialize();\n"
+"\n"
+" </script>\n"
+"</head>\n"
+"<body>\n"
+"<noscript><h2 style=\"color: #ff0000\">Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable\n"
+" Javascript and reload this page!</h2></noscript>\n"
+"<div>\n"
+" <p><input type=\"text\" placeholder=\"type and press enter to chat\" id=\"chat\" /></p>\n"
+" <div id=\"container\">\n"
+" <div id=\"console\"/>\n"
+" </div>\n"
+"</div>\n"
+"</body>\n"
+"</html>\n"
+;
+
+static const int ws_chat_index_html_size = nxt_length(ws_chat_index_html);
diff --git a/src/test/nxt_unit_websocket_echo.c b/src/test/nxt_unit_websocket_echo.c
new file mode 100644
index 00000000..2a89cdc0
--- /dev/null
+++ b/src/test/nxt_unit_websocket_echo.c
@@ -0,0 +1,105 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <nxt_clang.h>
+#include <nxt_websocket.h>
+#include <nxt_unit_websocket.h>
+
+
+static void
+ws_echo_request_handler(nxt_unit_request_info_t *req)
+{
+ int rc;
+ const char *target;
+
+ rc = NXT_UNIT_OK;
+ target = nxt_unit_sptr_get(&req->request->target);
+
+ if (strcmp(target, "/") == 0) {
+ if (!nxt_unit_request_is_websocket_handshake(req)) {
+ goto notfound;
+ }
+
+ rc = nxt_unit_response_init(req, 101, 0, 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
+
+ nxt_unit_response_upgrade(req);
+ nxt_unit_response_send(req);
+
+ return;
+ }
+
+notfound:
+
+ rc = nxt_unit_response_init(req, 404, 0, 0);
+
+fail:
+
+ nxt_unit_request_done(req, rc);
+}
+
+
+static void
+ws_echo_websocket_handler(nxt_unit_websocket_frame_t *ws)
+{
+ uint8_t opcode;
+ ssize_t size;
+ nxt_unit_request_info_t *req;
+
+ static size_t buf_size = 0;
+ static uint8_t *buf = NULL;
+
+ if (buf_size < ws->content_length) {
+ buf = realloc(buf, ws->content_length);
+ buf_size = ws->content_length;
+ }
+
+ req = ws->req;
+ opcode = ws->header->opcode;
+
+ if (opcode == NXT_WEBSOCKET_OP_PONG) {
+ nxt_unit_websocket_done(ws);
+ return;
+ }
+
+ size = nxt_unit_websocket_read(ws, buf, ws->content_length);
+
+ nxt_unit_websocket_send(req, opcode, ws->header->fin, buf, size);
+ nxt_unit_websocket_done(ws);
+
+ if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
+ nxt_unit_request_done(req, NXT_UNIT_OK);
+ }
+}
+
+
+int
+main()
+{
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_init_t init;
+
+ memset(&init, 0, sizeof(nxt_unit_init_t));
+
+ init.callbacks.request_handler = ws_echo_request_handler;
+ init.callbacks.websocket_handler = ws_echo_websocket_handler;
+
+ ctx = nxt_unit_init(&init);
+ if (ctx == NULL) {
+ return 1;
+ }
+
+ nxt_unit_run(ctx);
+ nxt_unit_done(ctx);
+
+ return 0;
+}