summaryrefslogtreecommitdiffhomepage
path: root/src/nodejs
diff options
context:
space:
mode:
authorAndrei Belov <defan@nginx.com>2019-08-22 21:33:54 +0300
committerAndrei Belov <defan@nginx.com>2019-08-22 21:33:54 +0300
commita07c4d30a64f781f93730576b5dced32422a9935 (patch)
tree06ebfaa66845a057b8069014c5379b2dcfc80861 /src/nodejs
parent8a579acddeae0c0106e15d82aa7220ac01deba84 (diff)
parentc47af243b0e805376c4ec908f21e07dc811b33f0 (diff)
downloadunit-a07c4d30a64f781f93730576b5dced32422a9935.tar.gz
unit-a07c4d30a64f781f93730576b5dced32422a9935.tar.bz2
Merged with the default branch.1.10.0-1
Diffstat (limited to 'src/nodejs')
-rw-r--r--[-rwxr-xr-x]src/nodejs/unit-http/http.js0
-rw-r--r--[-rwxr-xr-x]src/nodejs/unit-http/http_server.js78
-rw-r--r--src/nodejs/unit-http/nxt_napi.h195
-rw-r--r--src/nodejs/unit-http/package.json9
-rw-r--r--[-rwxr-xr-x]src/nodejs/unit-http/socket.js8
-rw-r--r--src/nodejs/unit-http/unit.cpp517
-rw-r--r--src/nodejs/unit-http/unit.h29
-rw-r--r--src/nodejs/unit-http/utils.js73
-rw-r--r--src/nodejs/unit-http/websocket.js14
-rw-r--r--src/nodejs/unit-http/websocket_connection.js683
-rw-r--r--src/nodejs/unit-http/websocket_frame.js11
-rw-r--r--src/nodejs/unit-http/websocket_request.js509
-rw-r--r--src/nodejs/unit-http/websocket_router.js157
-rw-r--r--src/nodejs/unit-http/websocket_router_request.js54
-rw-r--r--src/nodejs/unit-http/websocket_server.js213
15 files changed, 2375 insertions, 175 deletions
diff --git a/src/nodejs/unit-http/http.js b/src/nodejs/unit-http/http.js
index 3a25fa2f..3a25fa2f 100755..100644
--- a/src/nodejs/unit-http/http.js
+++ b/src/nodejs/unit-http/http.js
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js
index ae8e204a..c42149a5 100755..100644
--- a/src/nodejs/unit-http/http_server.js
+++ b/src/nodejs/unit-http/http_server.js
@@ -8,16 +8,21 @@
const EventEmitter = require('events');
const http = require('http');
const util = require('util');
-const unit_lib = require('unit-http/build/Release/unit-http.node');
-const unit_socket = require('unit-http/socket');
-
-const { Socket } = unit_socket;
+const unit_lib = require('./build/Release/unit-http');
+const Socket = require('./socket');
+const WebSocketFrame = require('./websocket_frame');
function ServerResponse(req) {
EventEmitter.call(this);
this.headers = {};
+
+ this.server = req.server;
+ this._request = req;
+ req._response = this;
+ this.socket = req.socket;
+ this.connection = req.connection;
}
util.inherits(ServerResponse, EventEmitter);
@@ -195,6 +200,8 @@ function writeHead(statusCode, reason, obj) {
}
}
}
+
+ return this;
};
/*
@@ -205,15 +212,23 @@ ServerResponse.prototype._implicitHeader = function _implicitHeader() {
this.writeHead(this.statusCode);
};
-ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
- var contentLength = 0;
+ServerResponse.prototype._send_headers = unit_lib.response_send_headers;
+ServerResponse.prototype._sendHeaders = function _sendHeaders() {
if (!this.headersSent) {
- unit_lib.unit_response_headers(this, this.statusCode, this.headers,
- this.headers_count, this.headers_len);
+ this._send_headers(this.statusCode, this.headers, this.headers_count,
+ this.headers_len);
this.headersSent = true;
}
+};
+
+ServerResponse.prototype._write = unit_lib.response_write;
+
+ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
+ var contentLength = 0;
+
+ this._sendHeaders();
if (typeof chunk === 'function') {
callback = chunk;
@@ -236,7 +251,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
contentLength = chunk.length;
}
- unit_lib.unit_response_write(this, chunk, contentLength);
+ this._write(chunk, contentLength);
}
if (typeof callback === 'function') {
@@ -266,11 +281,13 @@ ServerResponse.prototype.write = function write(chunk, encoding, callback) {
return true;
};
+ServerResponse.prototype._end = unit_lib.response_end;
+
ServerResponse.prototype.end = function end(chunk, encoding, callback) {
if (!this.finished) {
this._writeBody(chunk, encoding, callback);
- unit_lib.unit_response_end(this);
+ this._end();
this.finished = true;
}
@@ -278,10 +295,12 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
return this;
};
-function ServerRequest(server) {
+function ServerRequest(server, socket) {
EventEmitter.call(this);
this.server = server;
+ this.socket = socket;
+ this.connection = socket;
}
util.inherits(ServerRequest, EventEmitter);
@@ -337,8 +356,8 @@ ServerRequest.prototype.on = function on(ev, fn) {
if (ev === "data") {
process.nextTick(function () {
- if (this.server.buffer.length !== 0) {
- this.emit("data", this.server.buffer);
+ if (this._data.length !== 0) {
+ this.emit("data", this._data);
}
}.bind(this));
@@ -355,14 +374,27 @@ function Server(requestListener) {
this.unit.createServer();
- this.socket = Socket;
- this.request = ServerRequest;
- this.response = ServerResponse;
+ this.Socket = Socket;
+ this.ServerRequest = ServerRequest;
+ this.ServerResponse = ServerResponse;
+ this.WebSocketFrame = WebSocketFrame;
if (requestListener) {
this.on('request', requestListener);
}
+
+ this._upgradeListenerCount = 0;
+ this.on('newListener', function(ev) {
+ if (ev === 'upgrade'){
+ this._upgradeListenerCount++;
+ }
+ }).on('removeListener', function(ev) {
+ if (ev === 'upgrade') {
+ this._upgradeListenerCount--;
+ }
+ });
}
+
util.inherits(Server, EventEmitter);
Server.prototype.setTimeout = function setTimeout(msecs, callback) {
@@ -379,15 +411,13 @@ Server.prototype.listen = function () {
this.unit.listen();
};
-Server.prototype.emit_events = function (server, req, res) {
- req.server = server;
- res.server = server;
- req.res = res;
- res.req = req;
-
- server.buffer = server.unit._read(req.socket.req_pointer);
+Server.prototype.emit_request = function (req, res) {
+ if (req._websocket_handshake && this._upgradeListenerCount > 0) {
+ this.emit('upgrade', req, req.socket);
- server.emit("request", req, res);
+ } else {
+ this.emit("request", req, res);
+ }
process.nextTick(() => {
req.emit("finish");
diff --git a/src/nodejs/unit-http/nxt_napi.h b/src/nodejs/unit-http/nxt_napi.h
index 9bcf3a21..d9721a40 100644
--- a/src/nodejs/unit-http/nxt_napi.h
+++ b/src/nodejs/unit-http/nxt_napi.h
@@ -188,6 +188,21 @@ struct nxt_napi {
}
+ inline void *
+ get_buffer_info(napi_value val, size_t &size)
+ {
+ void *res;
+ napi_status status;
+
+ status = napi_get_buffer_info(env_, val, &res, &size);
+ if (status != napi_ok) {
+ throw exception("Failed to get buffer info");
+ }
+
+ return res;
+ }
+
+
inline napi_value
get_cb_info(napi_callback_info info, size_t &argc, napi_value *argv)
{
@@ -219,6 +234,23 @@ struct nxt_napi {
inline napi_value
+ get_cb_info(napi_callback_info info, napi_value &arg)
+ {
+ size_t argc;
+ napi_value res;
+
+ argc = 1;
+ res = get_cb_info(info, argc, &arg);
+
+ if (argc != 1) {
+ throw exception("Wrong args count. Expected 1");
+ }
+
+ return res;
+ }
+
+
+ inline napi_value
get_element(napi_value obj, uint32_t i)
{
napi_value res;
@@ -311,15 +343,22 @@ struct nxt_napi {
inline nxt_unit_request_info_t *
get_request_info(napi_value obj)
{
- int64_t n;
+ return (nxt_unit_request_info_t *) unwrap(obj);
+ }
+
+
+ inline uint32_t
+ get_value_bool(napi_value obj)
+ {
+ bool res;
napi_status status;
- status = napi_get_value_int64(env_, obj, &n);
+ status = napi_get_value_bool(env_, obj, &res);
if (status != napi_ok) {
- throw exception("Failed to get request pointer");
+ throw exception("Failed to get bool");
}
- return (nxt_unit_request_info_t *) (intptr_t) n;
+ return res;
}
@@ -353,6 +392,21 @@ struct nxt_napi {
}
+ inline size_t
+ get_value_string_utf8(napi_value val, char *buf, size_t bufsize)
+ {
+ size_t res;
+ napi_status status;
+
+ status = napi_get_value_string_utf8(env_, val, buf, bufsize, &res);
+ if (status != napi_ok) {
+ throw exception("Failed to get string utf8");
+ }
+
+ return res;
+ }
+
+
inline bool
is_array(napi_value val)
{
@@ -368,6 +422,21 @@ struct nxt_napi {
}
+ inline bool
+ is_buffer(napi_value val)
+ {
+ bool res;
+ napi_status status;
+
+ status = napi_is_buffer(env_, val, &res);
+ if (status != napi_ok) {
+ throw exception("Failed to confirm value is buffer");
+ }
+
+ return res;
+ }
+
+
inline napi_value
make_callback(napi_async_context ctx, napi_value val, napi_value func,
int argc, const napi_value *argv)
@@ -398,6 +467,41 @@ struct nxt_napi {
inline napi_value
+ make_callback(napi_async_context ctx, napi_value val, napi_value func)
+ {
+ return make_callback(ctx, val, func, 0, NULL);
+ }
+
+
+ inline napi_value
+ make_callback(napi_async_context ctx, napi_value val, napi_value func,
+ napi_value arg1)
+ {
+ return make_callback(ctx, val, func, 1, &arg1);
+ }
+
+
+ inline napi_value
+ make_callback(napi_async_context ctx, napi_value val, napi_value func,
+ napi_value arg1, napi_value arg2)
+ {
+ napi_value args[2] = { arg1, arg2 };
+
+ return make_callback(ctx, val, func, 2, args);
+ }
+
+
+ inline napi_value
+ make_callback(napi_async_context ctx, napi_value val, napi_value func,
+ napi_value arg1, napi_value arg2, napi_value arg3)
+ {
+ napi_value args[3] = { arg1, arg2, arg3 };
+
+ return make_callback(ctx, val, func, 3, args);
+ }
+
+
+ inline napi_value
new_instance(napi_value ctor)
{
napi_value res;
@@ -427,6 +531,22 @@ struct nxt_napi {
}
+ inline napi_value
+ new_instance(napi_value ctor, napi_value param1, napi_value param2)
+ {
+ napi_value res;
+ napi_status status;
+ napi_value param[2] = { param1, param2 };
+
+ status = napi_new_instance(env_, ctor, 2, param, &res);
+ if (status != napi_ok) {
+ throw exception("Failed to create instance");
+ }
+
+ return res;
+ }
+
+
inline void
set_element(napi_value obj, uint32_t i, napi_value val)
{
@@ -472,8 +592,46 @@ struct nxt_napi {
}
+ template<typename T>
inline void
- set_named_property(napi_value obj, const char *name, intptr_t val)
+ set_named_property(napi_value obj, const char *name, T val)
+ {
+ set_named_property(obj, name, create(val));
+ }
+
+
+ inline napi_value
+ create(int32_t val)
+ {
+ napi_value ptr;
+ napi_status status;
+
+ status = napi_create_int32(env_, val, &ptr);
+ if (status != napi_ok) {
+ throw exception("Failed to create int32");
+ }
+
+ return ptr;
+ }
+
+
+ inline napi_value
+ create(uint32_t val)
+ {
+ napi_value ptr;
+ napi_status status;
+
+ status = napi_create_uint32(env_, val, &ptr);
+ if (status != napi_ok) {
+ throw exception("Failed to create uint32");
+ }
+
+ return ptr;
+ }
+
+
+ inline napi_value
+ create(int64_t val)
{
napi_value ptr;
napi_status status;
@@ -483,7 +641,32 @@ struct nxt_napi {
throw exception("Failed to create int64");
}
- set_named_property(obj, name, ptr);
+ return ptr;
+ }
+
+
+ inline void
+ remove_wrap(napi_ref& ref)
+ {
+ if (ref != nullptr) {
+ remove_wrap(get_reference_value(ref));
+ ref = nullptr;
+ }
+ }
+
+
+ inline void *
+ remove_wrap(napi_value val)
+ {
+ void *res;
+ napi_status status;
+
+ status = napi_remove_wrap(env_, val, &res);
+ if (status != napi_ok) {
+ throw exception("Failed to remove_wrap");
+ }
+
+ return res;
}
diff --git a/src/nodejs/unit-http/package.json b/src/nodejs/unit-http/package.json
index 6a6c00b4..7ee01346 100644
--- a/src/nodejs/unit-http/package.json
+++ b/src/nodejs/unit-http/package.json
@@ -14,7 +14,14 @@
"package.json",
"socket.js",
"binding.gyp",
- "README.md"
+ "README.md",
+ "websocket.js",
+ "websocket_connection.js",
+ "websocket_frame.js",
+ "websocket_request.js",
+ "websocket_router.js",
+ "websocket_router_request.js",
+ "websocket_server.js"
],
"scripts": {
"clean": "node-gyp clean",
diff --git a/src/nodejs/unit-http/socket.js b/src/nodejs/unit-http/socket.js
index 6e836949..b1a3abb8 100755..100644
--- a/src/nodejs/unit-http/socket.js
+++ b/src/nodejs/unit-http/socket.js
@@ -7,7 +7,7 @@
const EventEmitter = require('events');
const util = require('util');
-const unit_lib = require('unit-http/build/Release/unit-http.node');
+const unit_lib = require('./build/Release/unit-http');
function Socket(options) {
EventEmitter.call(this);
@@ -89,7 +89,7 @@ Socket.prototype.setTimeout = function setTimeout(timeout, callback) {
this.timeout = timeout;
- this.on('timeout', callback);
+ // this.on('timeout', callback);
return this;
};
@@ -101,6 +101,4 @@ Socket.prototype.write = function write(data, encoding, callback) {
};
-module.exports = {
- Socket
-};
+module.exports = Socket;
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 3f66189a..ac10024c 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -10,6 +10,8 @@
#include <uv.h>
+#include <nxt_unit_websocket.h>
+
napi_ref Unit::constructor_;
@@ -20,17 +22,27 @@ struct nxt_nodejs_ctx_t {
};
+struct req_data_t {
+ napi_ref sock_ref;
+ napi_ref resp_ref;
+ napi_ref conn_ref;
+};
+
+
Unit::Unit(napi_env env, napi_value jsthis):
nxt_napi(env),
wrapper_(wrap(jsthis, this, destroy)),
unit_ctx_(nullptr)
{
+ nxt_unit_debug(NULL, "Unit::Unit()");
}
Unit::~Unit()
{
delete_reference(wrapper_);
+
+ nxt_unit_debug(NULL, "Unit::~Unit()");
}
@@ -38,23 +50,26 @@ napi_value
Unit::init(napi_env env, napi_value exports)
{
nxt_napi napi(env);
- napi_value cons;
+ napi_value ctor;
- napi_property_descriptor properties[] = {
+ napi_property_descriptor unit_props[] = {
{ "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
{ "listen", 0, listen, 0, 0, 0, napi_default, 0 },
- { "_read", 0, _read, 0, 0, 0, napi_default, 0 }
};
try {
- cons = napi.define_class("Unit", create, 3, properties);
- constructor_ = napi.create_reference(cons);
+ ctor = napi.define_class("Unit", create, 2, unit_props);
+ constructor_ = napi.create_reference(ctor);
- napi.set_named_property(exports, "Unit", cons);
- napi.set_named_property(exports, "unit_response_headers",
+ napi.set_named_property(exports, "Unit", ctor);
+ napi.set_named_property(exports, "response_send_headers",
response_send_headers);
- napi.set_named_property(exports, "unit_response_write", response_write);
- napi.set_named_property(exports, "unit_response_end", response_end);
+ napi.set_named_property(exports, "response_write", response_write);
+ napi.set_named_property(exports, "response_end", response_end);
+ napi.set_named_property(exports, "websocket_send_frame",
+ websocket_send_frame);
+ napi.set_named_property(exports, "websocket_set_sock",
+ websocket_set_sock);
} catch (exception &e) {
napi.throw_error(e);
@@ -78,7 +93,7 @@ napi_value
Unit::create(napi_env env, napi_callback_info info)
{
nxt_napi napi(env);
- napi_value target, cons, instance, jsthis;
+ napi_value target, ctor, instance, jsthis;
try {
target = napi.get_new_target(info);
@@ -94,8 +109,8 @@ Unit::create(napi_env env, napi_callback_info info)
}
/* Invoked as plain function `Unit(...)`, turn into construct call. */
- cons = napi.get_reference_value(constructor_);
- instance = napi.new_instance(cons);
+ ctor = napi.get_reference_value(constructor_);
+ instance = napi.new_instance(ctor);
napi.create_reference(instance);
} catch (exception &e) {
@@ -130,10 +145,14 @@ Unit::create_server(napi_env env, napi_callback_info info)
memset(&unit_init, 0, sizeof(nxt_unit_init_t));
unit_init.data = obj;
- unit_init.callbacks.request_handler = request_handler;
- unit_init.callbacks.add_port = add_port;
- unit_init.callbacks.remove_port = remove_port;
- unit_init.callbacks.quit = quit;
+ unit_init.callbacks.request_handler = request_handler_cb;
+ unit_init.callbacks.websocket_handler = websocket_handler_cb;
+ unit_init.callbacks.close_handler = close_handler_cb;
+ unit_init.callbacks.add_port = add_port;
+ unit_init.callbacks.remove_port = remove_port;
+ unit_init.callbacks.quit = quit_cb;
+
+ unit_init.request_data_size = sizeof(req_data_t);
obj->unit_ctx_ = nxt_unit_init(&unit_init);
if (obj->unit_ctx_ == NULL) {
@@ -157,74 +176,139 @@ Unit::listen(napi_env env, napi_callback_info info)
}
-napi_value
-Unit::_read(napi_env env, napi_callback_info info)
+void
+Unit::request_handler_cb(nxt_unit_request_info_t *req)
{
- void *data;
- size_t argc;
- nxt_napi napi(env);
- napi_value buffer, argv;
- nxt_unit_request_info_t *req;
+ Unit *obj;
- argc = 1;
+ obj = reinterpret_cast<Unit *>(req->unit->data);
+
+ obj->request_handler(req);
+}
+
+
+void
+Unit::request_handler(nxt_unit_request_info_t *req)
+{
+ napi_value socket, request, response, server_obj, emit_request;
+
+ memset(req->data, 0, sizeof(req_data_t));
try {
- napi.get_cb_info(info, argc, &argv);
+ nxt_handle_scope scope(env());
+
+ server_obj = get_server_object();
+
+ socket = create_socket(server_obj, req);
+ request = create_request(server_obj, socket);
+ response = create_response(server_obj, request, req);
+
+ create_headers(req, request);
- req = napi.get_request_info(argv);
- buffer = napi.create_buffer((size_t) req->content_length, &data);
+ emit_request = get_named_property(server_obj, "emit_request");
+
+ nxt_async_context async_context(env(), "request_handler");
+ nxt_callback_scope async_scope(async_context);
+
+ make_callback(async_context, server_obj, emit_request, request,
+ response);
} catch (exception &e) {
- napi.throw_error(e);
- return nullptr;
+ nxt_unit_req_warn(req, "request_handler: %s", e.str);
}
+}
- nxt_unit_request_read(req, data, req->content_length);
- return buffer;
+void
+Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
+{
+ Unit *obj;
+
+ obj = reinterpret_cast<Unit *>(ws->req->unit->data);
+
+ obj->websocket_handler(ws);
}
void
-Unit::request_handler(nxt_unit_request_info_t *req)
+Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
{
- Unit *obj;
- napi_value socket, request, response, server_obj;
- napi_value emit_events;
- napi_value events_args[3];
+ napi_value frame, server_obj, process_frame, conn;
+ req_data_t *req_data;
- obj = reinterpret_cast<Unit *>(req->unit->data);
+ req_data = (req_data_t *) ws->req->data;
try {
- nxt_handle_scope scope(obj->env());
-
- server_obj = obj->get_server_object();
+ nxt_handle_scope scope(env());
- socket = obj->create_socket(server_obj, req);
- request = obj->create_request(server_obj, socket);
- response = obj->create_response(server_obj, socket, request, req);
+ server_obj = get_server_object();
- obj->create_headers(req, request);
+ frame = create_websocket_frame(server_obj, ws);
- emit_events = obj->get_named_property(server_obj, "emit_events");
+ conn = get_reference_value(req_data->conn_ref);
- events_args[0] = server_obj;
- events_args[1] = request;
- events_args[2] = response;
+ process_frame = get_named_property(conn, "processFrame");
- nxt_async_context async_context(obj->env(), "unit_request_handler");
+ nxt_async_context async_context(env(), "websocket_handler");
nxt_callback_scope async_scope(async_context);
- obj->make_callback(async_context, server_obj, emit_events,
- 3, events_args);
+ make_callback(async_context, conn, process_frame, frame);
} catch (exception &e) {
- obj->throw_error(e);
+ nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
}
+
+ nxt_unit_websocket_done(ws);
+}
+
+
+void
+Unit::close_handler_cb(nxt_unit_request_info_t *req)
+{
+ Unit *obj;
+
+ obj = reinterpret_cast<Unit *>(req->unit->data);
+
+ obj->close_handler(req);
}
void
+Unit::close_handler(nxt_unit_request_info_t *req)
+{
+ napi_value conn_handle_close, conn;
+ req_data_t *req_data;
+
+ req_data = (req_data_t *) req->data;
+
+ try {
+ nxt_handle_scope scope(env());
+
+ conn = get_reference_value(req_data->conn_ref);
+
+ conn_handle_close = get_named_property(conn, "handleSocketClose");
+
+ nxt_async_context async_context(env(), "close_handler");
+ nxt_callback_scope async_scope(async_context);
+
+ make_callback(async_context, conn, conn_handle_close,
+ nxt_napi::create(0));
+
+ remove_wrap(req_data->sock_ref);
+ remove_wrap(req_data->resp_ref);
+ remove_wrap(req_data->conn_ref);
+
+ } catch (exception &e) {
+ nxt_unit_req_warn(req, "close_handler: %s", e.str);
+
+ return;
+ }
+
+ nxt_unit_request_done(req, NXT_UNIT_OK);
+}
+
+
+static void
nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{
nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
@@ -244,14 +328,14 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
obj = reinterpret_cast<Unit *>(ctx->unit->data);
if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
- obj->throw_error("Failed to upgrade read"
- " file descriptor to O_NONBLOCK");
+ nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
+ port->in_fd, strerror(errno), errno);
return -1;
}
status = napi_get_uv_event_loop(obj->env(), &loop);
if (status != napi_ok) {
- obj->throw_error("Failed to get uv.loop");
+ nxt_unit_warn(ctx, "Failed to get uv.loop");
return NXT_UNIT_ERROR;
}
@@ -259,13 +343,13 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
if (err < 0) {
- obj->throw_error("Failed to init uv.poll");
+ nxt_unit_warn(ctx, "Failed to init uv.poll");
return NXT_UNIT_ERROR;
}
err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
if (err < 0) {
- obj->throw_error("Failed to start uv.poll");
+ nxt_unit_warn(ctx, "Failed to start uv.poll");
return NXT_UNIT_ERROR;
}
@@ -308,27 +392,35 @@ Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
void
-Unit::quit(nxt_unit_ctx_t *ctx)
+Unit::quit_cb(nxt_unit_ctx_t *ctx)
{
- Unit *obj;
- napi_value server_obj, emit_close;
+ Unit *obj;
obj = reinterpret_cast<Unit *>(ctx->unit->data);
+ obj->quit(ctx);
+}
+
+
+void
+Unit::quit(nxt_unit_ctx_t *ctx)
+{
+ napi_value server_obj, emit_close;
+
try {
- nxt_handle_scope scope(obj->env());
+ nxt_handle_scope scope(env());
- server_obj = obj->get_server_object();
+ server_obj = get_server_object();
- emit_close = obj->get_named_property(server_obj, "emit_close");
+ emit_close = get_named_property(server_obj, "emit_close");
- nxt_async_context async_context(obj->env(), "unit_quit");
+ nxt_async_context async_context(env(), "unit_quit");
nxt_callback_scope async_scope(async_context);
- obj->make_callback(async_context, server_obj, emit_close, 0, NULL);
+ make_callback(async_context, server_obj, emit_close);
} catch (exception &e) {
- obj->throw_error(e);
+ nxt_unit_debug(ctx, "quit: %s", e.str);
}
nxt_unit_done(ctx);
@@ -349,8 +441,9 @@ Unit::get_server_object()
void
Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
{
+ void *data;
uint32_t i;
- napi_value headers, raw_headers;
+ napi_value headers, raw_headers, buffer;
napi_status status;
nxt_unit_request_t *r;
@@ -373,6 +466,13 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
set_named_property(request, "httpVersion", r->version, r->version_length);
set_named_property(request, "method", r->method, r->method_length);
set_named_property(request, "url", r->target, r->target_length);
+
+ set_named_property(request, "_websocket_handshake", r->websocket_handshake);
+
+ buffer = create_buffer((size_t) req->content_length, &data);
+ nxt_unit_request_read(req, data, req->content_length);
+
+ set_named_property(request, "_data", buffer);
}
@@ -410,15 +510,18 @@ napi_value
Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
{
napi_value constructor, res;
+ req_data_t *req_data;
nxt_unit_request_t *r;
r = req->request;
- constructor = get_named_property(server_obj, "socket");
+ constructor = get_named_property(server_obj, "Socket");
res = new_instance(constructor);
- set_named_property(res, "req_pointer", (intptr_t) req);
+ req_data = (req_data_t *) req->data;
+ req_data->sock_ref = wrap(res, req, sock_destroy);
+
set_named_property(res, "remoteAddress", r->remote, r->remote_length);
set_named_property(res, "localAddress", r->local, r->local_length);
@@ -429,34 +532,66 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
napi_value
Unit::create_request(napi_value server_obj, napi_value socket)
{
- napi_value constructor, return_val;
+ napi_value constructor;
- constructor = get_named_property(server_obj, "request");
+ constructor = get_named_property(server_obj, "ServerRequest");
- return_val = new_instance(constructor, server_obj);
+ return new_instance(constructor, server_obj, socket);
+}
- set_named_property(return_val, "socket", socket);
- set_named_property(return_val, "connection", socket);
- return return_val;
+napi_value
+Unit::create_response(napi_value server_obj, napi_value request,
+ nxt_unit_request_info_t *req)
+{
+ napi_value constructor, res;
+ req_data_t *req_data;
+
+ constructor = get_named_property(server_obj, "ServerResponse");
+
+ res = new_instance(constructor, request);
+
+ req_data = (req_data_t *) req->data;
+ req_data->resp_ref = wrap(res, req, resp_destroy);
+
+ return res;
}
napi_value
-Unit::create_response(napi_value server_obj, napi_value socket,
- napi_value request, nxt_unit_request_info_t *req)
+Unit::create_websocket_frame(napi_value server_obj,
+ nxt_unit_websocket_frame_t *ws)
{
- napi_value constructor, return_val;
+ void *data;
+ napi_value constructor, res, buffer;
+ uint8_t sc[2];
+
+ constructor = get_named_property(server_obj, "WebSocketFrame");
- constructor = get_named_property(server_obj, "response");
+ res = new_instance(constructor);
- return_val = new_instance(constructor, request);
+ set_named_property(res, "fin", (bool) ws->header->fin);
+ set_named_property(res, "opcode", ws->header->opcode);
+ set_named_property(res, "length", (int64_t) ws->payload_len);
- set_named_property(return_val, "socket", socket);
- set_named_property(return_val, "connection", socket);
- set_named_property(return_val, "_req_point", (intptr_t) req);
+ if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
+ if (ws->payload_len >= 2) {
+ nxt_unit_websocket_read(ws, sc, 2);
- return return_val;
+ set_named_property(res, "closeStatus",
+ (((uint16_t) sc[0]) << 8) | sc[1]);
+
+ } else {
+ set_named_property(res, "closeStatus", -1);
+ }
+ }
+
+ buffer = create_buffer((size_t) ws->content_length, &data);
+ nxt_unit_websocket_read(ws, data, ws->content_length);
+
+ set_named_property(res, "binaryPayload", buffer);
+
+ return res;
}
@@ -472,35 +607,32 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
uint16_t hash;
nxt_napi napi(env);
napi_value this_arg, headers, keys, name, value, array_val;
- napi_value req_num, array_entry;
+ napi_value array_entry;
napi_valuetype val_type;
nxt_unit_field_t *f;
nxt_unit_request_info_t *req;
- napi_value argv[5];
+ napi_value argv[4];
- argc = 5;
+ argc = 4;
try {
this_arg = napi.get_cb_info(info, argc, argv);
- if (argc != 5) {
+ if (argc != 4) {
napi.throw_error("Wrong args count. Expected: "
"statusCode, headers, headers count, "
"headers length");
return nullptr;
}
- req_num = napi.get_named_property(argv[0], "_req_point");
-
- req = napi.get_request_info(req_num);
-
- status_code = napi.get_value_uint32(argv[1]);
- keys_count = napi.get_value_uint32(argv[3]);
- header_len = napi.get_value_uint32(argv[4]);
+ req = napi.get_request_info(this_arg);
+ status_code = napi.get_value_uint32(argv[0]);
+ keys_count = napi.get_value_uint32(argv[2]);
+ header_len = napi.get_value_uint32(argv[3]);
/* Need to reserve extra byte for C-string 0-termination. */
header_len++;
- headers = argv[2];
+ headers = argv[1];
ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
if (ret != NXT_UNIT_OK) {
@@ -611,65 +743,151 @@ napi_value
Unit::response_write(napi_env env, napi_callback_info info)
{
int ret;
- char *ptr;
+ void *ptr;
size_t argc, have_buf_len;
uint32_t buf_len;
nxt_napi napi(env);
- napi_value this_arg, req_num;
- napi_status status;
+ napi_value this_arg;
nxt_unit_buf_t *buf;
napi_valuetype buf_type;
nxt_unit_request_info_t *req;
- napi_value argv[3];
+ napi_value argv[2];
- argc = 3;
+ argc = 2;
try {
this_arg = napi.get_cb_info(info, argc, argv);
- if (argc != 3) {
+ if (argc != 2) {
throw exception("Wrong args count. Expected: "
"chunk, chunk length");
}
- req_num = napi.get_named_property(argv[0], "_req_point");
- req = napi.get_request_info(req_num);
+ req = napi.get_request_info(this_arg);
+ buf_type = napi.type_of(argv[0]);
+ buf_len = napi.get_value_uint32(argv[1]) + 1;
+
+ buf = nxt_unit_response_buf_alloc(req, buf_len);
+ if (buf == NULL) {
+ throw exception("Failed to allocate response buffer");
+ }
+
+ if (buf_type == napi_string) {
+ /* TODO: will work only for utf8 content-type */
+
+ have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
+ buf_len);
- buf_len = napi.get_value_uint32(argv[2]);
+ } else {
+ ptr = napi.get_buffer_info(argv[0], have_buf_len);
- buf_type = napi.type_of(argv[1]);
+ memcpy(buf->free, ptr, have_buf_len);
+ }
+ buf->free += have_buf_len;
+
+ ret = nxt_unit_buf_send(buf);
+ if (ret != NXT_UNIT_OK) {
+ throw exception("Failed to send body buf");
+ }
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
}
- buf_len++;
+ return this_arg;
+}
- buf = nxt_unit_response_buf_alloc(req, buf_len);
- if (buf == NULL) {
- goto failed;
- }
- if (buf_type == napi_string) {
- /* TODO: will work only for utf8 content-type */
+napi_value
+Unit::response_end(napi_env env, napi_callback_info info)
+{
+ nxt_napi napi(env);
+ napi_value this_arg;
+ req_data_t *req_data;
+ nxt_unit_request_info_t *req;
+
+ try {
+ this_arg = napi.get_cb_info(info);
+
+ req = napi.get_request_info(this_arg);
- status = napi_get_value_string_utf8(env, argv[1], buf->free,
- buf_len, &have_buf_len);
+ req_data = (req_data_t *) req->data;
- } else {
- status = napi_get_buffer_info(env, argv[1], (void **) &ptr,
- &have_buf_len);
+ napi.remove_wrap(req_data->sock_ref);
+ napi.remove_wrap(req_data->resp_ref);
+ napi.remove_wrap(req_data->conn_ref);
- memcpy(buf->free, ptr, have_buf_len);
+ } catch (exception &e) {
+ napi.throw_error(e);
+ return nullptr;
}
- if (status != napi_ok) {
- goto failed;
+ nxt_unit_request_done(req, NXT_UNIT_OK);
+
+ return this_arg;
+}
+
+
+napi_value
+Unit::websocket_send_frame(napi_env env, napi_callback_info info)
+{
+ int ret, iovec_len;
+ bool fin;
+ size_t buf_len;
+ uint32_t opcode, sc;
+ nxt_napi napi(env);
+ napi_value this_arg, frame, payload;
+ nxt_unit_request_info_t *req;
+ char status_code[2];
+ struct iovec iov[2];
+
+ iovec_len = 0;
+
+ try {
+ this_arg = napi.get_cb_info(info, frame);
+
+ req = napi.get_request_info(this_arg);
+
+ opcode = napi.get_value_uint32(napi.get_named_property(frame,
+ "opcode"));
+ if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
+ sc = napi.get_value_uint32(napi.get_named_property(frame,
+ "closeStatus"));
+ status_code[0] = (sc >> 8) & 0xFF;
+ status_code[1] = sc & 0xFF;
+
+ iov[iovec_len].iov_base = status_code;
+ iov[iovec_len].iov_len = 2;
+ iovec_len++;
+ }
+
+ try {
+ fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
+
+ } catch (exception &e) {
+ fin = true;
+ }
+
+ payload = napi.get_named_property(frame, "binaryPayload");
+
+ if (napi.is_buffer(payload)) {
+ iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
+
+ } else {
+ buf_len = 0;
+ }
+
+ } catch (exception &e) {
+ napi.throw_error(e);
+ return nullptr;
}
- buf->free += have_buf_len;
+ if (buf_len > 0) {
+ iov[iovec_len].iov_len = buf_len;
+ iovec_len++;
+ }
- ret = nxt_unit_buf_send(buf);
+ ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
if (ret != NXT_UNIT_OK) {
goto failed;
}
@@ -678,34 +896,65 @@ Unit::response_write(napi_env env, napi_callback_info info)
failed:
- napi.throw_error("Failed to write body");
+ napi.throw_error("Failed to send frame");
return nullptr;
}
napi_value
-Unit::response_end(napi_env env, napi_callback_info info)
+Unit::websocket_set_sock(napi_env env, napi_callback_info info)
{
- size_t argc;
nxt_napi napi(env);
- napi_value resp, this_arg, req_num;
+ napi_value this_arg, sock;
+ req_data_t *req_data;
nxt_unit_request_info_t *req;
- argc = 1;
-
try {
- this_arg = napi.get_cb_info(info, argc, &resp);
+ this_arg = napi.get_cb_info(info, sock);
- req_num = napi.get_named_property(resp, "_req_point");
- req = napi.get_request_info(req_num);
+ req = napi.get_request_info(sock);
+
+ req_data = (req_data_t *) req->data;
+ req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
}
- nxt_unit_request_done(req, NXT_UNIT_OK);
-
return this_arg;
}
+
+
+void
+Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+{
+ nxt_unit_request_info_t *req;
+
+ req = (nxt_unit_request_info_t *) nativeObject;
+
+ nxt_unit_warn(NULL, "conn_destroy: %p", req);
+}
+
+
+void
+Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+{
+ nxt_unit_request_info_t *req;
+
+ req = (nxt_unit_request_info_t *) nativeObject;
+
+ nxt_unit_warn(NULL, "sock_destroy: %p", req);
+}
+
+
+void
+Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+{
+ nxt_unit_request_info_t *req;
+
+ req = (nxt_unit_request_info_t *) nativeObject;
+
+ nxt_unit_warn(NULL, "resp_destroy: %p", req);
+}
diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h
index e76d805a..f5eaf9fd 100644
--- a/src/nodejs/unit-http/unit.h
+++ b/src/nodejs/unit-http/unit.h
@@ -19,14 +19,28 @@ private:
static napi_value create(napi_env env, napi_callback_info info);
static void destroy(napi_env env, void *nativeObject, void *finalize_hint);
+ static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint);
+ static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint);
+ static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static napi_value create_server(napi_env env, napi_callback_info info);
static napi_value listen(napi_env env, napi_callback_info info);
static napi_value _read(napi_env env, napi_callback_info info);
- static void request_handler(nxt_unit_request_info_t *req);
+
+ static void request_handler_cb(nxt_unit_request_info_t *req);
+ void request_handler(nxt_unit_request_info_t *req);
+
+ static void websocket_handler_cb(nxt_unit_websocket_frame_t *ws);
+ void websocket_handler(nxt_unit_websocket_frame_t *ws);
+
+ static void close_handler_cb(nxt_unit_request_info_t *req);
+ void close_handler(nxt_unit_request_info_t *req);
+
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
- static void quit(nxt_unit_ctx_t *ctx);
+
+ static void quit_cb(nxt_unit_ctx_t *ctx);
+ void quit(nxt_unit_ctx_t *ctx);
napi_value get_server_object();
@@ -35,20 +49,25 @@ private:
napi_value create_request(napi_value server_obj, napi_value socket);
- napi_value create_response(napi_value server_obj, napi_value socket,
- napi_value request,
+ napi_value create_response(napi_value server_obj, napi_value request,
nxt_unit_request_info_t *req);
+ napi_value create_websocket_frame(napi_value server_obj,
+ nxt_unit_websocket_frame_t *ws);
+
static napi_value response_send_headers(napi_env env,
napi_callback_info info);
static napi_value response_write(napi_env env, napi_callback_info info);
static napi_value response_end(napi_env env, napi_callback_info info);
+ static napi_value websocket_send_frame(napi_env env,
+ napi_callback_info info);
+ static napi_value websocket_set_sock(napi_env env, napi_callback_info info);
void create_headers(nxt_unit_request_info_t *req, napi_value request);
void append_header(nxt_unit_field_t *f, napi_value headers,
- napi_value raw_headers, uint32_t idx);
+ napi_value raw_headers, uint32_t idx);
static napi_ref constructor_;
diff --git a/src/nodejs/unit-http/utils.js b/src/nodejs/unit-http/utils.js
new file mode 100644
index 00000000..e1e51b0e
--- /dev/null
+++ b/src/nodejs/unit-http/utils.js
@@ -0,0 +1,73 @@
+var noop = exports.noop = function(){};
+
+exports.extend = function extend(dest, source) {
+ for (var prop in source) {
+ dest[prop] = source[prop];
+ }
+};
+
+exports.eventEmitterListenerCount =
+ require('events').EventEmitter.listenerCount ||
+ function(emitter, type) { return emitter.listeners(type).length; };
+
+exports.bufferAllocUnsafe = Buffer.allocUnsafe ?
+ Buffer.allocUnsafe :
+ function oldBufferAllocUnsafe(size) { return new Buffer(size); };
+
+exports.bufferFromString = Buffer.from ?
+ Buffer.from :
+ function oldBufferFromString(string, encoding) {
+ return new Buffer(string, encoding);
+ };
+
+exports.BufferingLogger = function createBufferingLogger(identifier, uniqueID) {
+ try {
+ var logFunction = require('debug')(identifier);
+ }
+ catch(e) {
+ logFunction = noop;
+ logFunction.enabled = false;
+ }
+
+ if (logFunction.enabled) {
+ var logger = new BufferingLogger(identifier, uniqueID, logFunction);
+ var debug = logger.log.bind(logger);
+ debug.printOutput = logger.printOutput.bind(logger);
+ debug.enabled = logFunction.enabled;
+ return debug;
+ }
+ logFunction.printOutput = noop;
+ return logFunction;
+};
+
+function BufferingLogger(identifier, uniqueID, logFunction) {
+ this.logFunction = logFunction;
+ this.identifier = identifier;
+ this.uniqueID = uniqueID;
+ this.buffer = [];
+}
+
+BufferingLogger.prototype.log = function() {
+ this.buffer.push([ new Date(), Array.prototype.slice.call(arguments) ]);
+ return this;
+};
+
+BufferingLogger.prototype.clear = function() {
+ this.buffer = [];
+ return this;
+};
+
+BufferingLogger.prototype.printOutput = function(logFunction) {
+ if (!logFunction) { logFunction = this.logFunction; }
+ var uniqueID = this.uniqueID;
+ this.buffer.forEach(function(entry) {
+ var date = entry[0].toLocaleString();
+ var args = entry[1].slice();
+ var formatString = args[0];
+ if (formatString !== (void 0) && formatString !== null) {
+ formatString = '%s - %s - ' + formatString.toString();
+ args.splice(0, 1, formatString, date, uniqueID);
+ logFunction.apply(global, args);
+ }
+ });
+};
diff --git a/src/nodejs/unit-http/websocket.js b/src/nodejs/unit-http/websocket.js
new file mode 100644
index 00000000..36d0e07a
--- /dev/null
+++ b/src/nodejs/unit-http/websocket.js
@@ -0,0 +1,14 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+'use strict';
+
+module.exports = {
+ 'server' : require('./websocket_server'),
+ 'router' : require('./websocket_router'),
+ 'frame' : require('./websocket_frame'),
+ 'request' : require('./websocket_request'),
+ 'connection' : require('./websocket_connection'),
+};
diff --git a/src/nodejs/unit-http/websocket_connection.js b/src/nodejs/unit-http/websocket_connection.js
new file mode 100644
index 00000000..4eccf6bf
--- /dev/null
+++ b/src/nodejs/unit-http/websocket_connection.js
@@ -0,0 +1,683 @@
+/************************************************************************
+ * Copyright 2010-2015 Brian McKelvey.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***********************************************************************/
+
+var util = require('util');
+var utils = require('./utils');
+var unit_lib = require('./build/Release/unit-http');
+var EventEmitter = require('events').EventEmitter;
+var WebSocketFrame = require('./websocket_frame');
+var bufferAllocUnsafe = utils.bufferAllocUnsafe;
+var bufferFromString = utils.bufferFromString;
+
+// Connected, fully-open, ready to send and receive frames
+const STATE_OPEN = 'open';
+// Received a close frame from the remote peer
+const STATE_PEER_REQUESTED_CLOSE = 'peer_requested_close';
+// Sent close frame to remote peer. No further data can be sent.
+const STATE_ENDING = 'ending';
+// Connection is fully closed. No further data can be sent or received.
+const STATE_CLOSED = 'closed';
+
+var idCounter = 0;
+
+function WebSocketConnection(socket, extensions, protocol, maskOutgoingPackets, config) {
+ this._debug = utils.BufferingLogger('websocket:connection', ++idCounter);
+ this._debug('constructor');
+
+ if (this._debug.enabled) {
+ instrumentSocketForDebugging(this, socket);
+ }
+
+ // Superclass Constructor
+ EventEmitter.call(this);
+
+ this._pingListenerCount = 0;
+ this.on('newListener', function(ev) {
+ if (ev === 'ping'){
+ this._pingListenerCount++;
+ }
+ }).on('removeListener', function(ev) {
+ if (ev === 'ping') {
+ this._pingListenerCount--;
+ }
+ });
+
+ this.config = config;
+ this.socket = socket;
+ this.protocol = protocol;
+ this.extensions = extensions;
+ this.remoteAddress = socket.remoteAddress;
+ this.closeReasonCode = -1;
+ this.closeDescription = null;
+ this.closeEventEmitted = false;
+
+ // We have to mask outgoing packets if we're acting as a WebSocket client.
+ this.maskOutgoingPackets = maskOutgoingPackets;
+
+ this.fragmentationSize = 0; // data received so far...
+ this.frameQueue = [];
+
+ // Various bits of connection state
+ this.connected = true;
+ this.state = STATE_OPEN;
+ this.waitingForCloseResponse = false;
+ // Received TCP FIN, socket's readable stream is finished.
+ this.receivedEnd = false;
+
+ this.closeTimeout = this.config.closeTimeout;
+ this.assembleFragments = this.config.assembleFragments;
+ this.maxReceivedMessageSize = this.config.maxReceivedMessageSize;
+
+ this.outputBufferFull = false;
+ this.inputPaused = false;
+ this._closeTimerHandler = this.handleCloseTimer.bind(this);
+
+ // Disable nagle algorithm?
+ this.socket.setNoDelay(this.config.disableNagleAlgorithm);
+
+ // Make sure there is no socket inactivity timeout
+ this.socket.setTimeout(0);
+
+ // The HTTP Client seems to subscribe to socket error events
+ // and re-dispatch them in such a way that doesn't make sense
+ // for users of our client, so we want to make sure nobody
+ // else is listening for error events on the socket besides us.
+ this.socket.removeAllListeners('error');
+
+ this._set_sock(this.socket);
+}
+
+WebSocketConnection.prototype._set_sock = unit_lib.websocket_set_sock;
+WebSocketConnection.prototype._end = unit_lib.response_end;
+
+WebSocketConnection.CLOSE_REASON_NORMAL = 1000;
+WebSocketConnection.CLOSE_REASON_GOING_AWAY = 1001;
+WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR = 1002;
+WebSocketConnection.CLOSE_REASON_UNPROCESSABLE_INPUT = 1003;
+WebSocketConnection.CLOSE_REASON_RESERVED = 1004; // Reserved value. Undefined meaning.
+WebSocketConnection.CLOSE_REASON_NOT_PROVIDED = 1005; // Not to be used on the wire
+WebSocketConnection.CLOSE_REASON_ABNORMAL = 1006; // Not to be used on the wire
+WebSocketConnection.CLOSE_REASON_INVALID_DATA = 1007;
+WebSocketConnection.CLOSE_REASON_POLICY_VIOLATION = 1008;
+WebSocketConnection.CLOSE_REASON_MESSAGE_TOO_BIG = 1009;
+WebSocketConnection.CLOSE_REASON_EXTENSION_REQUIRED = 1010;
+WebSocketConnection.CLOSE_REASON_INTERNAL_SERVER_ERROR = 1011;
+WebSocketConnection.CLOSE_REASON_TLS_HANDSHAKE_FAILED = 1015; // Not to be used on the wire
+
+WebSocketConnection.CLOSE_DESCRIPTIONS = {
+ 1000: 'Normal connection closure',
+ 1001: 'Remote peer is going away',
+ 1002: 'Protocol error',
+ 1003: 'Unprocessable input',
+ 1004: 'Reserved',
+ 1005: 'Reason not provided',
+ 1006: 'Abnormal closure, no further detail available',
+ 1007: 'Invalid data received',
+ 1008: 'Policy violation',
+ 1009: 'Message too big',
+ 1010: 'Extension requested by client is required',
+ 1011: 'Internal Server Error',
+ 1015: 'TLS Handshake Failed'
+};
+
+function validateCloseReason(code) {
+ if (code < 1000) {
+ // Status codes in the range 0-999 are not used
+ return false;
+ }
+ if (code >= 1000 && code <= 2999) {
+ // Codes from 1000 - 2999 are reserved for use by the protocol. Only
+ // a few codes are defined, all others are currently illegal.
+ return [1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014].indexOf(code) !== -1;
+ }
+ if (code >= 3000 && code <= 3999) {
+ // Reserved for use by libraries, frameworks, and applications.
+ // Should be registered with IANA. Interpretation of these codes is
+ // undefined by the WebSocket protocol.
+ return true;
+ }
+ if (code >= 4000 && code <= 4999) {
+ // Reserved for private use. Interpretation of these codes is
+ // undefined by the WebSocket protocol.
+ return true;
+ }
+ if (code >= 5000) {
+ return false;
+ }
+}
+
+util.inherits(WebSocketConnection, EventEmitter);
+
+WebSocketConnection.prototype._addSocketEventListeners = function() {
+ this.socket.on('error', this.handleSocketError.bind(this));
+ this.socket.on('end', this.handleSocketEnd.bind(this));
+ this.socket.on('close', this.handleSocketClose.bind(this));
+};
+
+WebSocketConnection.prototype.handleSocketError = function(error) {
+ this._debug('handleSocketError: %j', error);
+ if (this.state === STATE_CLOSED) {
+ // See https://github.com/theturtle32/WebSocket-Node/issues/288
+ this._debug(' --- Socket \'error\' after \'close\'');
+ return;
+ }
+ this.closeReasonCode = WebSocketConnection.CLOSE_REASON_ABNORMAL;
+ this.closeDescription = 'Socket Error: ' + error.syscall + ' ' + error.code;
+ this.connected = false;
+ this.state = STATE_CLOSED;
+ this.fragmentationSize = 0;
+ if (utils.eventEmitterListenerCount(this, 'error') > 0) {
+ this.emit('error', error);
+ }
+ this.socket.destroy(error);
+ this._debug.printOutput();
+
+ this._end();
+};
+
+WebSocketConnection.prototype.handleSocketEnd = function() {
+ this._debug('handleSocketEnd: received socket end. state = %s', this.state);
+ this.receivedEnd = true;
+ if (this.state === STATE_CLOSED) {
+ // When using the TLS module, sometimes the socket will emit 'end'
+ // after it emits 'close'. I don't think that's correct behavior,
+ // but we should deal with it gracefully by ignoring it.
+ this._debug(' --- Socket \'end\' after \'close\'');
+ return;
+ }
+ if (this.state !== STATE_PEER_REQUESTED_CLOSE &&
+ this.state !== STATE_ENDING) {
+ this._debug(' --- UNEXPECTED socket end.');
+ this.socket.end();
+
+ this._end();
+ }
+};
+
+WebSocketConnection.prototype.handleSocketClose = function(hadError) {
+ this._debug('handleSocketClose: received socket close');
+ this.socketHadError = hadError;
+ this.connected = false;
+ this.state = STATE_CLOSED;
+ // If closeReasonCode is still set to -1 at this point then we must
+ // not have received a close frame!!
+ if (this.closeReasonCode === -1) {
+ this.closeReasonCode = WebSocketConnection.CLOSE_REASON_ABNORMAL;
+ this.closeDescription = 'Connection dropped by remote peer.';
+ }
+ this.clearCloseTimer();
+ if (!this.closeEventEmitted) {
+ this.closeEventEmitted = true;
+ this._debug('-- Emitting WebSocketConnection close event');
+ this.emit('close', this.closeReasonCode, this.closeDescription);
+ }
+};
+
+WebSocketConnection.prototype.close = function(reasonCode, description) {
+ if (this.connected) {
+ this._debug('close: Initating clean WebSocket close sequence.');
+ if ('number' !== typeof reasonCode) {
+ reasonCode = WebSocketConnection.CLOSE_REASON_NORMAL;
+ }
+ if (!validateCloseReason(reasonCode)) {
+ throw new Error('Close code ' + reasonCode + ' is not valid.');
+ }
+ if ('string' !== typeof description) {
+ description = WebSocketConnection.CLOSE_DESCRIPTIONS[reasonCode];
+ }
+ this.closeReasonCode = reasonCode;
+ this.closeDescription = description;
+ this.setCloseTimer();
+ this.sendCloseFrame(this.closeReasonCode, this.closeDescription);
+ this.state = STATE_ENDING;
+ this.connected = false;
+ }
+};
+
+WebSocketConnection.prototype.drop = function(reasonCode, description, skipCloseFrame) {
+ this._debug('drop');
+ if (typeof(reasonCode) !== 'number') {
+ reasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR;
+ }
+
+ if (typeof(description) !== 'string') {
+ // If no description is provided, try to look one up based on the
+ // specified reasonCode.
+ description = WebSocketConnection.CLOSE_DESCRIPTIONS[reasonCode];
+ }
+
+ this._debug('Forcefully dropping connection. skipCloseFrame: %s, code: %d, description: %s',
+ skipCloseFrame, reasonCode, description
+ );
+
+ this.closeReasonCode = reasonCode;
+ this.closeDescription = description;
+ this.frameQueue = [];
+ this.fragmentationSize = 0;
+ if (!skipCloseFrame) {
+ this.sendCloseFrame(reasonCode, description);
+ }
+ this.connected = false;
+ this.state = STATE_CLOSED;
+ this.clearCloseTimer();
+
+ if (!this.closeEventEmitted) {
+ this.closeEventEmitted = true;
+ this._debug('Emitting WebSocketConnection close event');
+ this.emit('close', this.closeReasonCode, this.closeDescription);
+ }
+
+ this._debug('Drop: destroying socket');
+ this.socket.destroy();
+
+ this._end();
+};
+
+WebSocketConnection.prototype.setCloseTimer = function() {
+ this._debug('setCloseTimer');
+ this.clearCloseTimer();
+ this._debug('Setting close timer');
+ this.waitingForCloseResponse = true;
+ this.closeTimer = setTimeout(this._closeTimerHandler, this.closeTimeout);
+};
+
+WebSocketConnection.prototype.clearCloseTimer = function() {
+ this._debug('clearCloseTimer');
+ if (this.closeTimer) {
+ this._debug('Clearing close timer');
+ clearTimeout(this.closeTimer);
+ this.waitingForCloseResponse = false;
+ this.closeTimer = null;
+ }
+};
+
+WebSocketConnection.prototype.handleCloseTimer = function() {
+ this._debug('handleCloseTimer');
+ this.closeTimer = null;
+ if (this.waitingForCloseResponse) {
+ this._debug('Close response not received from client. Forcing socket end.');
+ this.waitingForCloseResponse = false;
+ this.state = STATE_CLOSED;
+ this.socket.end();
+
+ this._end();
+ }
+};
+
+WebSocketConnection.prototype.processFrame = function(frame) {
+ if (!this.connected) {
+ return;
+ }
+
+ this._debug('processFrame');
+ this._debug(' -- frame: %s', frame);
+
+ // Any non-control opcode besides 0x00 (continuation) received in the
+ // middle of a fragmented message is illegal.
+ if (this.frameQueue.length !== 0 && (frame.opcode > 0x00 && frame.opcode < 0x08)) {
+ this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
+ 'Illegal frame opcode 0x' + frame.opcode.toString(16) + ' ' +
+ 'received in middle of fragmented message.');
+ return;
+ }
+
+ switch(frame.opcode) {
+ case 0x02: // WebSocketFrame.BINARY_FRAME
+ this._debug('-- Binary Frame');
+ if (this.assembleFragments) {
+ if (frame.fin) {
+ // Complete single-frame message received
+ this._debug('---- Emitting \'message\' event');
+ this.emit('message', {
+ type: 'binary',
+ binaryData: frame.binaryPayload
+ });
+ }
+ else {
+ // beginning of a fragmented message
+ this.frameQueue.push(frame);
+ this.fragmentationSize = frame.length;
+ }
+ }
+ break;
+ case 0x01: // WebSocketFrame.TEXT_FRAME
+ this._debug('-- Text Frame');
+ if (this.assembleFragments) {
+ if (frame.fin) {
+ // Complete single-frame message received
+ this._debug('---- Emitting \'message\' event');
+ this.emit('message', {
+ type: 'utf8',
+ utf8Data: frame.binaryPayload.toString('utf8')
+ });
+ }
+ else {
+ // beginning of a fragmented message
+ this.frameQueue.push(frame);
+ this.fragmentationSize = frame.length;
+ }
+ }
+ break;
+ case 0x00: // WebSocketFrame.CONTINUATION
+ this._debug('-- Continuation Frame');
+ if (this.assembleFragments) {
+ if (this.frameQueue.length === 0) {
+ this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
+ 'Unexpected Continuation Frame');
+ return;
+ }
+
+ this.fragmentationSize += frame.length;
+
+ if (this.fragmentationSize > this.maxReceivedMessageSize) {
+ this.drop(WebSocketConnection.CLOSE_REASON_MESSAGE_TOO_BIG,
+ 'Maximum message size exceeded.');
+ return;
+ }
+
+ this.frameQueue.push(frame);
+
+ if (frame.fin) {
+ // end of fragmented message, so we process the whole
+ // message now. We also have to decode the utf-8 data
+ // for text frames after combining all the fragments.
+ var bytesCopied = 0;
+ var binaryPayload = bufferAllocUnsafe(this.fragmentationSize);
+ var opcode = this.frameQueue[0].opcode;
+ this.frameQueue.forEach(function (currentFrame) {
+ currentFrame.binaryPayload.copy(binaryPayload, bytesCopied);
+ bytesCopied += currentFrame.binaryPayload.length;
+ });
+ this.frameQueue = [];
+ this.fragmentationSize = 0;
+
+ switch (opcode) {
+ case 0x02: // WebSocketOpcode.BINARY_FRAME
+ this.emit('message', {
+ type: 'binary',
+ binaryData: binaryPayload
+ });
+ break;
+ case 0x01: // WebSocketOpcode.TEXT_FRAME
+ this.emit('message', {
+ type: 'utf8',
+ utf8Data: binaryPayload.toString('utf8')
+ });
+ break;
+ default:
+ this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
+ 'Unexpected first opcode in fragmentation sequence: 0x' + opcode.toString(16));
+ return;
+ }
+ }
+ }
+ break;
+ case 0x09: // WebSocketFrame.PING
+ this._debug('-- Ping Frame');
+
+ if (this._pingListenerCount > 0) {
+ // logic to emit the ping frame: this is only done when a listener is known to exist
+ // Expose a function allowing the user to override the default ping() behavior
+ var cancelled = false;
+ var cancel = function() {
+ cancelled = true;
+ };
+ this.emit('ping', cancel, frame.binaryPayload);
+
+ // Only send a pong if the client did not indicate that he would like to cancel
+ if (!cancelled) {
+ this.pong(frame.binaryPayload);
+ }
+ }
+ else {
+ this.pong(frame.binaryPayload);
+ }
+
+ break;
+ case 0x0A: // WebSocketFrame.PONG
+ this._debug('-- Pong Frame');
+ this.emit('pong', frame.binaryPayload);
+ break;
+ case 0x08: // WebSocketFrame.CONNECTION_CLOSE
+ this._debug('-- Close Frame');
+ if (this.waitingForCloseResponse) {
+ // Got response to our request to close the connection.
+ // Close is complete, so we just hang up.
+ this._debug('---- Got close response from peer. Completing closing handshake.');
+ this.clearCloseTimer();
+ this.waitingForCloseResponse = false;
+ this.state = STATE_CLOSED;
+ this.socket.end();
+
+ this._end();
+ return;
+ }
+
+ this._debug('---- Closing handshake initiated by peer.');
+ // Got request from other party to close connection.
+ // Send back acknowledgement and then hang up.
+ this.state = STATE_PEER_REQUESTED_CLOSE;
+ var respondCloseReasonCode;
+
+ // Make sure the close reason provided is legal according to
+ // the protocol spec. Providing no close status is legal.
+ // WebSocketFrame sets closeStatus to -1 by default, so if it
+ // is still -1, then no status was provided.
+ if (frame.invalidCloseFrameLength) {
+ this.closeReasonCode = 1005; // 1005 = No reason provided.
+ respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR;
+ }
+ else if (frame.closeStatus === -1 || validateCloseReason(frame.closeStatus)) {
+ this.closeReasonCode = frame.closeStatus;
+ respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_NORMAL;
+ }
+ else {
+ this.closeReasonCode = frame.closeStatus;
+ respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR;
+ }
+
+ // If there is a textual description in the close frame, extract it.
+ if (frame.binaryPayload.length > 1) {
+ this.closeDescription = frame.binaryPayload.toString('utf8');
+ }
+ else {
+ this.closeDescription = WebSocketConnection.CLOSE_DESCRIPTIONS[this.closeReasonCode];
+ }
+ this._debug(
+ '------ Remote peer %s - code: %d - %s - close frame payload length: %d',
+ this.remoteAddress, this.closeReasonCode,
+ this.closeDescription, frame.length
+ );
+ this._debug('------ responding to remote peer\'s close request.');
+ this.drop(respondCloseReasonCode, null);
+ this.connected = false;
+ break;
+ default:
+ this._debug('-- Unrecognized Opcode %d', frame.opcode);
+ this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
+ 'Unrecognized Opcode: 0x' + frame.opcode.toString(16));
+ break;
+ }
+};
+
+WebSocketConnection.prototype.send = function(data, cb) {
+ this._debug('send');
+ if (Buffer.isBuffer(data)) {
+ this.sendBytes(data, cb);
+ }
+ else if (typeof(data['toString']) === 'function') {
+ this.sendUTF(data, cb);
+ }
+ else {
+ throw new Error('Data provided must either be a Node Buffer or implement toString()');
+ }
+};
+
+WebSocketConnection.prototype.sendUTF = function(data, cb) {
+ data = bufferFromString(data.toString(), 'utf8');
+ this._debug('sendUTF: %d bytes', data.length);
+
+ var frame = new WebSocketFrame();
+ frame.opcode = 0x01; // WebSocketOpcode.TEXT_FRAME
+ frame.binaryPayload = data;
+
+ this.fragmentAndSend(frame, cb);
+};
+
+WebSocketConnection.prototype.sendBytes = function(data, cb) {
+ this._debug('sendBytes');
+ if (!Buffer.isBuffer(data)) {
+ throw new Error('You must pass a Node Buffer object to WebSocketConnection.prototype.sendBytes()');
+ }
+
+ var frame = new WebSocketFrame();
+ frame.opcode = 0x02; // WebSocketOpcode.BINARY_FRAME
+ frame.binaryPayload = data;
+
+ this.fragmentAndSend(frame, cb);
+};
+
+WebSocketConnection.prototype.ping = function(data) {
+ this._debug('ping');
+
+ var frame = new WebSocketFrame();
+ frame.opcode = 0x09; // WebSocketOpcode.PING
+ frame.fin = true;
+
+ if (data) {
+ if (!Buffer.isBuffer(data)) {
+ data = bufferFromString(data.toString(), 'utf8');
+ }
+ if (data.length > 125) {
+ this._debug('WebSocket: Data for ping is longer than 125 bytes. Truncating.');
+ data = data.slice(0,124);
+ }
+ frame.binaryPayload = data;
+ }
+
+ this.sendFrame(frame);
+};
+
+// Pong frames have to echo back the contents of the data portion of the
+// ping frame exactly, byte for byte.
+WebSocketConnection.prototype.pong = function(binaryPayload) {
+ this._debug('pong');
+
+ var frame = new WebSocketFrame();
+ frame.opcode = 0x0A; // WebSocketOpcode.PONG
+ if (Buffer.isBuffer(binaryPayload) && binaryPayload.length > 125) {
+ this._debug('WebSocket: Data for pong is longer than 125 bytes. Truncating.');
+ binaryPayload = binaryPayload.slice(0,124);
+ }
+ frame.binaryPayload = binaryPayload;
+ frame.fin = true;
+
+ this.sendFrame(frame);
+};
+
+WebSocketConnection.prototype.fragmentAndSend = function(frame, cb) {
+ this._debug('fragmentAndSend');
+ if (frame.opcode > 0x07) {
+ throw new Error('You cannot fragment control frames.');
+ }
+
+ var threshold = this.config.fragmentationThreshold;
+ var length = frame.binaryPayload.length;
+
+ // Send immediately if fragmentation is disabled or the message is not
+ // larger than the fragmentation threshold.
+ if (!this.config.fragmentOutgoingMessages || (frame.binaryPayload && length <= threshold)) {
+ frame.fin = true;
+ this.sendFrame(frame, cb);
+ return;
+ }
+
+ var numFragments = Math.ceil(length / threshold);
+ var sentFragments = 0;
+ var sentCallback = function fragmentSentCallback(err) {
+ if (err) {
+ if (typeof cb === 'function') {
+ // pass only the first error
+ cb(err);
+ cb = null;
+ }
+ return;
+ }
+ ++sentFragments;
+ if ((sentFragments === numFragments) && (typeof cb === 'function')) {
+ cb();
+ }
+ };
+ for (var i=1; i <= numFragments; i++) {
+ var currentFrame = new WebSocketFrame();
+
+ // continuation opcode except for first frame.
+ currentFrame.opcode = (i === 1) ? frame.opcode : 0x00;
+
+ // fin set on last frame only
+ currentFrame.fin = (i === numFragments);
+
+ // length is likely to be shorter on the last fragment
+ var currentLength = (i === numFragments) ? length - (threshold * (i-1)) : threshold;
+ var sliceStart = threshold * (i-1);
+
+ // Slice the right portion of the original payload
+ currentFrame.binaryPayload = frame.binaryPayload.slice(sliceStart, sliceStart + currentLength);
+
+ this.sendFrame(currentFrame, sentCallback);
+ }
+};
+
+WebSocketConnection.prototype.sendCloseFrame = function(reasonCode, description, cb) {
+ if (typeof(reasonCode) !== 'number') {
+ reasonCode = WebSocketConnection.CLOSE_REASON_NORMAL;
+ }
+
+ this._debug('sendCloseFrame state: %s, reasonCode: %d, description: %s', this.state, reasonCode, description);
+
+ if (this.state !== STATE_OPEN && this.state !== STATE_PEER_REQUESTED_CLOSE) { return; }
+
+ var frame = new WebSocketFrame();
+ frame.fin = true;
+ frame.opcode = 0x08; // WebSocketOpcode.CONNECTION_CLOSE
+ frame.closeStatus = reasonCode;
+ if (typeof(description) === 'string') {
+ frame.binaryPayload = bufferFromString(description, 'utf8');
+ }
+
+ this.sendFrame(frame, cb);
+ this.socket.end();
+};
+
+WebSocketConnection.prototype._send_frame = unit_lib.websocket_send_frame;
+
+WebSocketConnection.prototype.sendFrame = function(frame, cb) {
+ this._debug('sendFrame');
+
+ frame.mask = this.maskOutgoingPackets;
+
+ this._send_frame(frame);
+
+ if (typeof cb === 'function') {
+ cb();
+ }
+
+ var flushed = 0; // this.socket.write(frame.toBuffer(), cb);
+ this.outputBufferFull = !flushed;
+ return flushed;
+};
+
+module.exports = WebSocketConnection;
diff --git a/src/nodejs/unit-http/websocket_frame.js b/src/nodejs/unit-http/websocket_frame.js
new file mode 100644
index 00000000..9989937d
--- /dev/null
+++ b/src/nodejs/unit-http/websocket_frame.js
@@ -0,0 +1,11 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+'use strict';
+
+function WebSocketFrame() {
+}
+
+module.exports = WebSocketFrame;
diff --git a/src/nodejs/unit-http/websocket_request.js b/src/nodejs/unit-http/websocket_request.js
new file mode 100644
index 00000000..d84e428b
--- /dev/null
+++ b/src/nodejs/unit-http/websocket_request.js
@@ -0,0 +1,509 @@
+/************************************************************************
+ * Copyright 2010-2015 Brian McKelvey.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***********************************************************************/
+
+var util = require('util');
+var url = require('url');
+var EventEmitter = require('events').EventEmitter;
+var WebSocketConnection = require('./websocket_connection');
+
+var headerValueSplitRegExp = /,\s*/;
+var headerParamSplitRegExp = /;\s*/;
+var headerSanitizeRegExp = /[\r\n]/g;
+var xForwardedForSeparatorRegExp = /,\s*/;
+var separators = [
+ '(', ')', '<', '>', '@',
+ ',', ';', ':', '\\', '\"',
+ '/', '[', ']', '?', '=',
+ '{', '}', ' ', String.fromCharCode(9)
+];
+var controlChars = [String.fromCharCode(127) /* DEL */];
+for (var i=0; i < 31; i ++) {
+ /* US-ASCII Control Characters */
+ controlChars.push(String.fromCharCode(i));
+}
+
+var cookieNameValidateRegEx = /([\x00-\x20\x22\x28\x29\x2c\x2f\x3a-\x3f\x40\x5b-\x5e\x7b\x7d\x7f])/;
+var cookieValueValidateRegEx = /[^\x21\x23-\x2b\x2d-\x3a\x3c-\x5b\x5d-\x7e]/;
+var cookieValueDQuoteValidateRegEx = /^"[^"]*"$/;
+var controlCharsAndSemicolonRegEx = /[\x00-\x20\x3b]/g;
+
+var cookieSeparatorRegEx = /[;,] */;
+
+var httpStatusDescriptions = {
+ 100: 'Continue',
+ 101: 'Switching Protocols',
+ 200: 'OK',
+ 201: 'Created',
+ 203: 'Non-Authoritative Information',
+ 204: 'No Content',
+ 205: 'Reset Content',
+ 206: 'Partial Content',
+ 300: 'Multiple Choices',
+ 301: 'Moved Permanently',
+ 302: 'Found',
+ 303: 'See Other',
+ 304: 'Not Modified',
+ 305: 'Use Proxy',
+ 307: 'Temporary Redirect',
+ 400: 'Bad Request',
+ 401: 'Unauthorized',
+ 402: 'Payment Required',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 406: 'Not Acceptable',
+ 407: 'Proxy Authorization Required',
+ 408: 'Request Timeout',
+ 409: 'Conflict',
+ 410: 'Gone',
+ 411: 'Length Required',
+ 412: 'Precondition Failed',
+ 413: 'Request Entity Too Long',
+ 414: 'Request-URI Too Long',
+ 415: 'Unsupported Media Type',
+ 416: 'Requested Range Not Satisfiable',
+ 417: 'Expectation Failed',
+ 426: 'Upgrade Required',
+ 500: 'Internal Server Error',
+ 501: 'Not Implemented',
+ 502: 'Bad Gateway',
+ 503: 'Service Unavailable',
+ 504: 'Gateway Timeout',
+ 505: 'HTTP Version Not Supported'
+};
+
+function WebSocketRequest(socket, httpRequest, serverConfig) {
+ // Superclass Constructor
+ EventEmitter.call(this);
+
+ this.socket = socket;
+ this.httpRequest = httpRequest;
+ this.resource = httpRequest.url;
+ this.remoteAddress = socket.remoteAddress;
+ this.remoteAddresses = [this.remoteAddress];
+ this.serverConfig = serverConfig;
+
+ // Watch for the underlying TCP socket closing before we call accept
+ this._socketIsClosing = false;
+ this._socketCloseHandler = this._handleSocketCloseBeforeAccept.bind(this);
+ this.socket.on('end', this._socketCloseHandler);
+ this.socket.on('close', this._socketCloseHandler);
+
+ this._resolved = false;
+}
+
+util.inherits(WebSocketRequest, EventEmitter);
+
+WebSocketRequest.prototype.readHandshake = function() {
+ var self = this;
+ var request = this.httpRequest;
+
+ // Decode URL
+ this.resourceURL = url.parse(this.resource, true);
+
+ this.host = request.headers['host'];
+ if (!this.host) {
+ throw new Error('Client must provide a Host header.');
+ }
+
+ this.key = request.headers['sec-websocket-key'];
+ if (!this.key) {
+ throw new Error('Client must provide a value for Sec-WebSocket-Key.');
+ }
+
+ this.webSocketVersion = parseInt(request.headers['sec-websocket-version'], 10);
+
+ if (!this.webSocketVersion || isNaN(this.webSocketVersion)) {
+ throw new Error('Client must provide a value for Sec-WebSocket-Version.');
+ }
+
+ switch (this.webSocketVersion) {
+ case 8:
+ case 13:
+ break;
+ default:
+ var e = new Error('Unsupported websocket client version: ' + this.webSocketVersion +
+ 'Only versions 8 and 13 are supported.');
+ e.httpCode = 426;
+ e.headers = {
+ 'Sec-WebSocket-Version': '13'
+ };
+ throw e;
+ }
+
+ if (this.webSocketVersion === 13) {
+ this.origin = request.headers['origin'];
+ }
+ else if (this.webSocketVersion === 8) {
+ this.origin = request.headers['sec-websocket-origin'];
+ }
+
+ // Protocol is optional.
+ var protocolString = request.headers['sec-websocket-protocol'];
+ this.protocolFullCaseMap = {};
+ this.requestedProtocols = [];
+ if (protocolString) {
+ var requestedProtocolsFullCase = protocolString.split(headerValueSplitRegExp);
+ requestedProtocolsFullCase.forEach(function(protocol) {
+ var lcProtocol = protocol.toLocaleLowerCase();
+ self.requestedProtocols.push(lcProtocol);
+ self.protocolFullCaseMap[lcProtocol] = protocol;
+ });
+ }
+
+ if (!this.serverConfig.ignoreXForwardedFor &&
+ request.headers['x-forwarded-for']) {
+ var immediatePeerIP = this.remoteAddress;
+ this.remoteAddresses = request.headers['x-forwarded-for']
+ .split(xForwardedForSeparatorRegExp);
+ this.remoteAddresses.push(immediatePeerIP);
+ this.remoteAddress = this.remoteAddresses[0];
+ }
+
+ // Extensions are optional.
+ var extensionsString = request.headers['sec-websocket-extensions'];
+ this.requestedExtensions = this.parseExtensions(extensionsString);
+
+ // Cookies are optional
+ var cookieString = request.headers['cookie'];
+ this.cookies = this.parseCookies(cookieString);
+};
+
+WebSocketRequest.prototype.parseExtensions = function(extensionsString) {
+ if (!extensionsString || extensionsString.length === 0) {
+ return [];
+ }
+ var extensions = extensionsString.toLocaleLowerCase().split(headerValueSplitRegExp);
+ extensions.forEach(function(extension, index, array) {
+ var params = extension.split(headerParamSplitRegExp);
+ var extensionName = params[0];
+ var extensionParams = params.slice(1);
+ extensionParams.forEach(function(rawParam, index, array) {
+ var arr = rawParam.split('=');
+ var obj = {
+ name: arr[0],
+ value: arr[1]
+ };
+ array.splice(index, 1, obj);
+ });
+ var obj = {
+ name: extensionName,
+ params: extensionParams
+ };
+ array.splice(index, 1, obj);
+ });
+ return extensions;
+};
+
+// This function adapted from node-cookie
+// https://github.com/shtylman/node-cookie
+WebSocketRequest.prototype.parseCookies = function(str) {
+ // Sanity Check
+ if (!str || typeof(str) !== 'string') {
+ return [];
+ }
+
+ var cookies = [];
+ var pairs = str.split(cookieSeparatorRegEx);
+
+ pairs.forEach(function(pair) {
+ var eq_idx = pair.indexOf('=');
+ if (eq_idx === -1) {
+ cookies.push({
+ name: pair,
+ value: null
+ });
+ return;
+ }
+
+ var key = pair.substr(0, eq_idx).trim();
+ var val = pair.substr(++eq_idx, pair.length).trim();
+
+ // quoted values
+ if ('"' === val[0]) {
+ val = val.slice(1, -1);
+ }
+
+ cookies.push({
+ name: key,
+ value: decodeURIComponent(val)
+ });
+ });
+
+ return cookies;
+};
+
+WebSocketRequest.prototype.accept = function(acceptedProtocol, allowedOrigin, cookies) {
+ this._verifyResolution();
+
+ // TODO: Handle extensions
+
+ var protocolFullCase;
+
+ if (acceptedProtocol) {
+ protocolFullCase = this.protocolFullCaseMap[acceptedProtocol.toLocaleLowerCase()];
+ if (typeof(protocolFullCase) === 'undefined') {
+ protocolFullCase = acceptedProtocol;
+ }
+ }
+ else {
+ protocolFullCase = acceptedProtocol;
+ }
+ this.protocolFullCaseMap = null;
+
+ var response = this.httpRequest._response;
+ response.statusCode = 101;
+
+ if (protocolFullCase) {
+ // validate protocol
+ for (var i=0; i < protocolFullCase.length; i++) {
+ var charCode = protocolFullCase.charCodeAt(i);
+ var character = protocolFullCase.charAt(i);
+ if (charCode < 0x21 || charCode > 0x7E || separators.indexOf(character) !== -1) {
+ this.reject(500);
+ throw new Error('Illegal character "' + String.fromCharCode(character) + '" in subprotocol.');
+ }
+ }
+ if (this.requestedProtocols.indexOf(acceptedProtocol) === -1) {
+ this.reject(500);
+ throw new Error('Specified protocol was not requested by the client.');
+ }
+
+ protocolFullCase = protocolFullCase.replace(headerSanitizeRegExp, '');
+ response += 'Sec-WebSocket-Protocol: ' + protocolFullCase + '\r\n';
+ }
+ this.requestedProtocols = null;
+
+ if (allowedOrigin) {
+ allowedOrigin = allowedOrigin.replace(headerSanitizeRegExp, '');
+ if (this.webSocketVersion === 13) {
+ response.setHeader('Origin', allowedOrigin);
+ }
+ else if (this.webSocketVersion === 8) {
+ response.setHeader('Sec-WebSocket-Origin', allowedOrigin);
+ }
+ }
+
+ if (cookies) {
+ if (!Array.isArray(cookies)) {
+ this.reject(500);
+ throw new Error('Value supplied for "cookies" argument must be an array.');
+ }
+ var seenCookies = {};
+ cookies.forEach(function(cookie) {
+ if (!cookie.name || !cookie.value) {
+ this.reject(500);
+ throw new Error('Each cookie to set must at least provide a "name" and "value"');
+ }
+
+ // Make sure there are no \r\n sequences inserted
+ cookie.name = cookie.name.replace(controlCharsAndSemicolonRegEx, '');
+ cookie.value = cookie.value.replace(controlCharsAndSemicolonRegEx, '');
+
+ if (seenCookies[cookie.name]) {
+ this.reject(500);
+ throw new Error('You may not specify the same cookie name twice.');
+ }
+ seenCookies[cookie.name] = true;
+
+ // token (RFC 2616, Section 2.2)
+ var invalidChar = cookie.name.match(cookieNameValidateRegEx);
+ if (invalidChar) {
+ this.reject(500);
+ throw new Error('Illegal character ' + invalidChar[0] + ' in cookie name');
+ }
+
+ // RFC 6265, Section 4.1.1
+ // *cookie-octet / ( DQUOTE *cookie-octet DQUOTE ) | %x21 / %x23-2B / %x2D-3A / %x3C-5B / %x5D-7E
+ if (cookie.value.match(cookieValueDQuoteValidateRegEx)) {
+ invalidChar = cookie.value.slice(1, -1).match(cookieValueValidateRegEx);
+ } else {
+ invalidChar = cookie.value.match(cookieValueValidateRegEx);
+ }
+ if (invalidChar) {
+ this.reject(500);
+ throw new Error('Illegal character ' + invalidChar[0] + ' in cookie value');
+ }
+
+ var cookieParts = [cookie.name + '=' + cookie.value];
+
+ // RFC 6265, Section 4.1.1
+ // 'Path=' path-value | <any CHAR except CTLs or ';'>
+ if(cookie.path){
+ invalidChar = cookie.path.match(controlCharsAndSemicolonRegEx);
+ if (invalidChar) {
+ this.reject(500);
+ throw new Error('Illegal character ' + invalidChar[0] + ' in cookie path');
+ }
+ cookieParts.push('Path=' + cookie.path);
+ }
+
+ // RFC 6265, Section 4.1.2.3
+ // 'Domain=' subdomain
+ if (cookie.domain) {
+ if (typeof(cookie.domain) !== 'string') {
+ this.reject(500);
+ throw new Error('Domain must be specified and must be a string.');
+ }
+ invalidChar = cookie.domain.match(controlCharsAndSemicolonRegEx);
+ if (invalidChar) {
+ this.reject(500);
+ throw new Error('Illegal character ' + invalidChar[0] + ' in cookie domain');
+ }
+ cookieParts.push('Domain=' + cookie.domain.toLowerCase());
+ }
+
+ // RFC 6265, Section 4.1.1
+ //'Expires=' sane-cookie-date | Force Date object requirement by using only epoch
+ if (cookie.expires) {
+ if (!(cookie.expires instanceof Date)){
+ this.reject(500);
+ throw new Error('Value supplied for cookie "expires" must be a vaild date object');
+ }
+ cookieParts.push('Expires=' + cookie.expires.toGMTString());
+ }
+
+ // RFC 6265, Section 4.1.1
+ //'Max-Age=' non-zero-digit *DIGIT
+ if (cookie.maxage) {
+ var maxage = cookie.maxage;
+ if (typeof(maxage) === 'string') {
+ maxage = parseInt(maxage, 10);
+ }
+ if (isNaN(maxage) || maxage <= 0 ) {
+ this.reject(500);
+ throw new Error('Value supplied for cookie "maxage" must be a non-zero number');
+ }
+ maxage = Math.round(maxage);
+ cookieParts.push('Max-Age=' + maxage.toString(10));
+ }
+
+ // RFC 6265, Section 4.1.1
+ //'Secure;'
+ if (cookie.secure) {
+ if (typeof(cookie.secure) !== 'boolean') {
+ this.reject(500);
+ throw new Error('Value supplied for cookie "secure" must be of type boolean');
+ }
+ cookieParts.push('Secure');
+ }
+
+ // RFC 6265, Section 4.1.1
+ //'HttpOnly;'
+ if (cookie.httponly) {
+ if (typeof(cookie.httponly) !== 'boolean') {
+ this.reject(500);
+ throw new Error('Value supplied for cookie "httponly" must be of type boolean');
+ }
+ cookieParts.push('HttpOnly');
+ }
+
+ response.addHeader('Set-Cookie', cookieParts.join(';'));
+ }.bind(this));
+ }
+
+ // TODO: handle negotiated extensions
+ // if (negotiatedExtensions) {
+ // response += 'Sec-WebSocket-Extensions: ' + negotiatedExtensions.join(', ') + '\r\n';
+ // }
+
+ // Mark the request resolved now so that the user can't call accept or
+ // reject a second time.
+ this._resolved = true;
+ this.emit('requestResolved', this);
+
+ var connection = new WebSocketConnection(this.socket, [], acceptedProtocol, false, this.serverConfig);
+ connection.webSocketVersion = this.webSocketVersion;
+ connection.remoteAddress = this.remoteAddress;
+ connection.remoteAddresses = this.remoteAddresses;
+
+ var self = this;
+
+ if (this._socketIsClosing) {
+ // Handle case when the client hangs up before we get a chance to
+ // accept the connection and send our side of the opening handshake.
+ cleanupFailedConnection(connection);
+
+ } else {
+ response._sendHeaders();
+ connection._addSocketEventListeners();
+ }
+
+ this.emit('requestAccepted', connection);
+ return connection;
+};
+
+WebSocketRequest.prototype.reject = function(status, reason, extraHeaders) {
+ this._verifyResolution();
+
+ // Mark the request resolved now so that the user can't call accept or
+ // reject a second time.
+ this._resolved = true;
+ this.emit('requestResolved', this);
+
+ if (typeof(status) !== 'number') {
+ status = 403;
+ }
+
+ var response = this.httpRequest._response;
+
+ response.statusCode = status;
+
+ if (reason) {
+ reason = reason.replace(headerSanitizeRegExp, '');
+ response.addHeader('X-WebSocket-Reject-Reason', reason);
+ }
+
+ if (extraHeaders) {
+ for (var key in extraHeaders) {
+ var sanitizedValue = extraHeaders[key].toString().replace(headerSanitizeRegExp, '');
+ var sanitizedKey = key.replace(headerSanitizeRegExp, '');
+ response += (sanitizedKey + ': ' + sanitizedValue + '\r\n');
+ }
+ }
+
+ response.end();
+
+ this.emit('requestRejected', this);
+};
+
+WebSocketRequest.prototype._handleSocketCloseBeforeAccept = function() {
+ this._socketIsClosing = true;
+ this._removeSocketCloseListeners();
+};
+
+WebSocketRequest.prototype._removeSocketCloseListeners = function() {
+ this.socket.removeListener('end', this._socketCloseHandler);
+ this.socket.removeListener('close', this._socketCloseHandler);
+};
+
+WebSocketRequest.prototype._verifyResolution = function() {
+ if (this._resolved) {
+ throw new Error('WebSocketRequest may only be accepted or rejected one time.');
+ }
+};
+
+function cleanupFailedConnection(connection) {
+ // Since we have to return a connection object even if the socket is
+ // already dead in order not to break the API, we schedule a 'close'
+ // event on the connection object to occur immediately.
+ process.nextTick(function() {
+ // WebSocketConnection.CLOSE_REASON_ABNORMAL = 1006
+ // Third param: Skip sending the close frame to a dead socket
+ connection.drop(1006, 'TCP connection lost before handshake completed.', true);
+ });
+}
+
+module.exports = WebSocketRequest;
diff --git a/src/nodejs/unit-http/websocket_router.js b/src/nodejs/unit-http/websocket_router.js
new file mode 100644
index 00000000..4efa35d2
--- /dev/null
+++ b/src/nodejs/unit-http/websocket_router.js
@@ -0,0 +1,157 @@
+/************************************************************************
+ * Copyright 2010-2015 Brian McKelvey.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***********************************************************************/
+
+var extend = require('./utils').extend;
+var util = require('util');
+var EventEmitter = require('events').EventEmitter;
+var WebSocketRouterRequest = require('./websocket_router_request');
+
+function WebSocketRouter(config) {
+ // Superclass Constructor
+ EventEmitter.call(this);
+
+ this.config = {
+ // The WebSocketServer instance to attach to.
+ server: null
+ };
+ if (config) {
+ extend(this.config, config);
+ }
+ this.handlers = [];
+
+ this._requestHandler = this.handleRequest.bind(this);
+ if (this.config.server) {
+ this.attachServer(this.config.server);
+ }
+}
+
+util.inherits(WebSocketRouter, EventEmitter);
+
+WebSocketRouter.prototype.attachServer = function(server) {
+ if (server) {
+ this.server = server;
+ this.server.on('request', this._requestHandler);
+ }
+ else {
+ throw new Error('You must specify a WebSocketServer instance to attach to.');
+ }
+};
+
+WebSocketRouter.prototype.detachServer = function() {
+ if (this.server) {
+ this.server.removeListener('request', this._requestHandler);
+ this.server = null;
+ }
+ else {
+ throw new Error('Cannot detach from server: not attached.');
+ }
+};
+
+WebSocketRouter.prototype.mount = function(path, protocol, callback) {
+ if (!path) {
+ throw new Error('You must specify a path for this handler.');
+ }
+ if (!protocol) {
+ protocol = '____no_protocol____';
+ }
+ if (!callback) {
+ throw new Error('You must specify a callback for this handler.');
+ }
+
+ path = this.pathToRegExp(path);
+ if (!(path instanceof RegExp)) {
+ throw new Error('Path must be specified as either a string or a RegExp.');
+ }
+ var pathString = path.toString();
+
+ // normalize protocol to lower-case
+ protocol = protocol.toLocaleLowerCase();
+
+ if (this.findHandlerIndex(pathString, protocol) !== -1) {
+ throw new Error('You may only mount one handler per path/protocol combination.');
+ }
+
+ this.handlers.push({
+ 'path': path,
+ 'pathString': pathString,
+ 'protocol': protocol,
+ 'callback': callback
+ });
+};
+WebSocketRouter.prototype.unmount = function(path, protocol) {
+ var index = this.findHandlerIndex(this.pathToRegExp(path).toString(), protocol);
+ if (index !== -1) {
+ this.handlers.splice(index, 1);
+ }
+ else {
+ throw new Error('Unable to find a route matching the specified path and protocol.');
+ }
+};
+
+WebSocketRouter.prototype.findHandlerIndex = function(pathString, protocol) {
+ protocol = protocol.toLocaleLowerCase();
+ for (var i=0, len=this.handlers.length; i < len; i++) {
+ var handler = this.handlers[i];
+ if (handler.pathString === pathString && handler.protocol === protocol) {
+ return i;
+ }
+ }
+ return -1;
+};
+
+WebSocketRouter.prototype.pathToRegExp = function(path) {
+ if (typeof(path) === 'string') {
+ if (path === '*') {
+ path = /^.*$/;
+ }
+ else {
+ path = path.replace(/[-[\]{}()*+?.,\\^$|#\s]/g, '\\$&');
+ path = new RegExp('^' + path + '$');
+ }
+ }
+ return path;
+};
+
+WebSocketRouter.prototype.handleRequest = function(request) {
+ var requestedProtocols = request.requestedProtocols;
+ if (requestedProtocols.length === 0) {
+ requestedProtocols = ['____no_protocol____'];
+ }
+
+ // Find a handler with the first requested protocol first
+ for (var i=0; i < requestedProtocols.length; i++) {
+ var requestedProtocol = requestedProtocols[i].toLocaleLowerCase();
+
+ // find the first handler that can process this request
+ for (var j=0, len=this.handlers.length; j < len; j++) {
+ var handler = this.handlers[j];
+ if (handler.path.test(request.resourceURL.pathname)) {
+ if (requestedProtocol === handler.protocol ||
+ handler.protocol === '*')
+ {
+ var routerRequest = new WebSocketRouterRequest(request, requestedProtocol);
+ handler.callback(routerRequest);
+ return;
+ }
+ }
+ }
+ }
+
+ // If we get here we were unable to find a suitable handler.
+ request.reject(404, 'No handler is available for the given request.');
+};
+
+module.exports = WebSocketRouter;
diff --git a/src/nodejs/unit-http/websocket_router_request.js b/src/nodejs/unit-http/websocket_router_request.js
new file mode 100644
index 00000000..d3e37457
--- /dev/null
+++ b/src/nodejs/unit-http/websocket_router_request.js
@@ -0,0 +1,54 @@
+/************************************************************************
+ * Copyright 2010-2015 Brian McKelvey.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***********************************************************************/
+
+var util = require('util');
+var EventEmitter = require('events').EventEmitter;
+
+function WebSocketRouterRequest(webSocketRequest, resolvedProtocol) {
+ // Superclass Constructor
+ EventEmitter.call(this);
+
+ this.webSocketRequest = webSocketRequest;
+ if (resolvedProtocol === '____no_protocol____') {
+ this.protocol = null;
+ }
+ else {
+ this.protocol = resolvedProtocol;
+ }
+ this.origin = webSocketRequest.origin;
+ this.resource = webSocketRequest.resource;
+ this.resourceURL = webSocketRequest.resourceURL;
+ this.httpRequest = webSocketRequest.httpRequest;
+ this.remoteAddress = webSocketRequest.remoteAddress;
+ this.webSocketVersion = webSocketRequest.webSocketVersion;
+ this.requestedExtensions = webSocketRequest.requestedExtensions;
+ this.cookies = webSocketRequest.cookies;
+}
+
+util.inherits(WebSocketRouterRequest, EventEmitter);
+
+WebSocketRouterRequest.prototype.accept = function(origin, cookies) {
+ var connection = this.webSocketRequest.accept(this.protocol, origin, cookies);
+ this.emit('requestAccepted', connection);
+ return connection;
+};
+
+WebSocketRouterRequest.prototype.reject = function(status, reason, extraHeaders) {
+ this.webSocketRequest.reject(status, reason, extraHeaders);
+ this.emit('requestRejected', this);
+};
+
+module.exports = WebSocketRouterRequest;
diff --git a/src/nodejs/unit-http/websocket_server.js b/src/nodejs/unit-http/websocket_server.js
new file mode 100644
index 00000000..306f3f67
--- /dev/null
+++ b/src/nodejs/unit-http/websocket_server.js
@@ -0,0 +1,213 @@
+/************************************************************************
+ * Copyright 2010-2015 Brian McKelvey.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***********************************************************************/
+
+var extend = require('./utils').extend;
+var utils = require('./utils');
+var util = require('util');
+var EventEmitter = require('events').EventEmitter;
+var WebSocketRequest = require('./websocket_request');
+
+var WebSocketServer = function WebSocketServer(config) {
+ // Superclass Constructor
+ EventEmitter.call(this);
+
+ this._handlers = {
+ upgrade: this.handleUpgrade.bind(this),
+ requestAccepted: this.handleRequestAccepted.bind(this),
+ requestResolved: this.handleRequestResolved.bind(this)
+ };
+ this.connections = [];
+ this.pendingRequests = [];
+ if (config) {
+ this.mount(config);
+ }
+};
+
+util.inherits(WebSocketServer, EventEmitter);
+
+WebSocketServer.prototype.mount = function(config) {
+ this.config = {
+ // The http server instance to attach to. Required.
+ httpServer: null,
+
+ // 64KiB max frame size.
+ maxReceivedFrameSize: 0x10000,
+
+ // 1MiB max message size, only applicable if
+ // assembleFragments is true
+ maxReceivedMessageSize: 0x100000,
+
+ // Outgoing messages larger than fragmentationThreshold will be
+ // split into multiple fragments.
+ fragmentOutgoingMessages: true,
+
+ // Outgoing frames are fragmented if they exceed this threshold.
+ // Default is 16KiB
+ fragmentationThreshold: 0x4000,
+
+ // If true, fragmented messages will be automatically assembled
+ // and the full message will be emitted via a 'message' event.
+ // If false, each frame will be emitted via a 'frame' event and
+ // the application will be responsible for aggregating multiple
+ // fragmented frames. Single-frame messages will emit a 'message'
+ // event in addition to the 'frame' event.
+ // Most users will want to leave this set to 'true'
+ assembleFragments: true,
+
+ // If this is true, websocket connections will be accepted
+ // regardless of the path and protocol specified by the client.
+ // The protocol accepted will be the first that was requested
+ // by the client. Clients from any origin will be accepted.
+ // This should only be used in the simplest of cases. You should
+ // probably leave this set to 'false' and inspect the request
+ // object to make sure it's acceptable before accepting it.
+ autoAcceptConnections: false,
+
+ // Whether or not the X-Forwarded-For header should be respected.
+ // It's important to set this to 'true' when accepting connections
+ // from untrusted clients, as a malicious client could spoof its
+ // IP address by simply setting this header. It's meant to be added
+ // by a trusted proxy or other intermediary within your own
+ // infrastructure.
+ // See: http://en.wikipedia.org/wiki/X-Forwarded-For
+ ignoreXForwardedFor: false,
+
+ // The Nagle Algorithm makes more efficient use of network resources
+ // by introducing a small delay before sending small packets so that
+ // multiple messages can be batched together before going onto the
+ // wire. This however comes at the cost of latency, so the default
+ // is to disable it. If you don't need low latency and are streaming
+ // lots of small messages, you can change this to 'false'
+ disableNagleAlgorithm: true,
+
+ // The number of milliseconds to wait after sending a close frame
+ // for an acknowledgement to come back before giving up and just
+ // closing the socket.
+ closeTimeout: 5000
+ };
+ extend(this.config, config);
+
+ if (this.config.httpServer) {
+ if (!Array.isArray(this.config.httpServer)) {
+ this.config.httpServer = [this.config.httpServer];
+ }
+ var upgradeHandler = this._handlers.upgrade;
+ this.config.httpServer.forEach(function(httpServer) {
+ httpServer.on('upgrade', upgradeHandler);
+ });
+ }
+ else {
+ throw new Error('You must specify an httpServer on which to mount the WebSocket server.');
+ }
+};
+
+WebSocketServer.prototype.unmount = function() {
+ var upgradeHandler = this._handlers.upgrade;
+ this.config.httpServer.forEach(function(httpServer) {
+ httpServer.removeListener('upgrade', upgradeHandler);
+ });
+};
+
+WebSocketServer.prototype.closeAllConnections = function() {
+ this.connections.forEach(function(connection) {
+ connection.close();
+ });
+ this.pendingRequests.forEach(function(request) {
+ process.nextTick(function() {
+ request.reject(503); // HTTP 503 Service Unavailable
+ });
+ });
+};
+
+WebSocketServer.prototype.broadcast = function(data) {
+ if (Buffer.isBuffer(data)) {
+ this.broadcastBytes(data);
+ }
+ else if (typeof(data.toString) === 'function') {
+ this.broadcastUTF(data);
+ }
+};
+
+WebSocketServer.prototype.broadcastUTF = function(utfData) {
+ this.connections.forEach(function(connection) {
+ connection.sendUTF(utfData);
+ });
+};
+
+WebSocketServer.prototype.broadcastBytes = function(binaryData) {
+ this.connections.forEach(function(connection) {
+ connection.sendBytes(binaryData);
+ });
+};
+
+WebSocketServer.prototype.shutDown = function() {
+ this.unmount();
+ this.closeAllConnections();
+};
+
+WebSocketServer.prototype.handleUpgrade = function(request, socket) {
+ var wsRequest = new WebSocketRequest(socket, request, this.config);
+ try {
+ wsRequest.readHandshake();
+ }
+ catch(e) {
+ wsRequest.reject(
+ e.httpCode ? e.httpCode : 400,
+ e.message,
+ e.headers
+ );
+ return;
+ }
+
+ this.pendingRequests.push(wsRequest);
+
+ wsRequest.once('requestAccepted', this._handlers.requestAccepted);
+ wsRequest.once('requestResolved', this._handlers.requestResolved);
+
+ if (!this.config.autoAcceptConnections && utils.eventEmitterListenerCount(this, 'request') > 0) {
+ this.emit('request', wsRequest);
+ }
+ else if (this.config.autoAcceptConnections) {
+ wsRequest.accept(wsRequest.requestedProtocols[0], wsRequest.origin);
+ }
+ else {
+ wsRequest.reject(404, 'No handler is configured to accept the connection.');
+ }
+};
+
+WebSocketServer.prototype.handleRequestAccepted = function(connection) {
+ var self = this;
+ connection.once('close', function(closeReason, description) {
+ self.handleConnectionClose(connection, closeReason, description);
+ });
+ this.connections.push(connection);
+ this.emit('connect', connection);
+};
+
+WebSocketServer.prototype.handleConnectionClose = function(connection, closeReason, description) {
+ var index = this.connections.indexOf(connection);
+ if (index !== -1) {
+ this.connections.splice(index, 1);
+ }
+ this.emit('close', connection, closeReason, description);
+};
+
+WebSocketServer.prototype.handleRequestResolved = function(request) {
+ var index = this.pendingRequests.indexOf(request);
+ if (index !== -1) { this.pendingRequests.splice(index, 1); }
+};
+
+module.exports = WebSocketServer;