diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:32:05 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:32:05 +0300 |
commit | e291841b3379f8787a10ad4f91e4aeae2ae323a4 (patch) | |
tree | 49a2f4629e5b8d6cd48f7436d7eeba4c99905675 /src/nodejs | |
parent | e501c74ddceab86e48c031ca9b5e154f52dcdae0 (diff) | |
download | unit-e291841b3379f8787a10ad4f91e4aeae2ae323a4.tar.gz unit-e291841b3379f8787a10ad4f91e4aeae2ae323a4.tar.bz2 |
Node.js: introducing websocket support.
Diffstat (limited to 'src/nodejs')
-rw-r--r--[-rwxr-xr-x] | src/nodejs/unit-http/http.js | 0 | ||||
-rw-r--r--[-rwxr-xr-x] | src/nodejs/unit-http/http_server.js | 76 | ||||
-rw-r--r-- | src/nodejs/unit-http/nxt_napi.h | 195 | ||||
-rw-r--r-- | src/nodejs/unit-http/package.json | 9 | ||||
-rw-r--r--[-rwxr-xr-x] | src/nodejs/unit-http/socket.js | 8 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 517 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.h | 29 | ||||
-rw-r--r-- | src/nodejs/unit-http/utils.js | 73 | ||||
-rw-r--r-- | src/nodejs/unit-http/websocket.js | 14 | ||||
-rw-r--r-- | src/nodejs/unit-http/websocket_connection.js | 683 | ||||
-rw-r--r-- | src/nodejs/unit-http/websocket_frame.js | 11 | ||||
-rw-r--r-- | src/nodejs/unit-http/websocket_request.js | 509 | ||||
-rw-r--r-- | src/nodejs/unit-http/websocket_router.js | 157 | ||||
-rw-r--r-- | src/nodejs/unit-http/websocket_router_request.js | 54 | ||||
-rw-r--r-- | src/nodejs/unit-http/websocket_server.js | 213 |
15 files changed, 2373 insertions, 175 deletions
diff --git a/src/nodejs/unit-http/http.js b/src/nodejs/unit-http/http.js index 3a25fa2f..3a25fa2f 100755..100644 --- a/src/nodejs/unit-http/http.js +++ b/src/nodejs/unit-http/http.js diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index 0fe5dd34..c42149a5 100755..100644 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -8,16 +8,21 @@ const EventEmitter = require('events'); const http = require('http'); const util = require('util'); -const unit_lib = require('unit-http/build/Release/unit-http.node'); -const unit_socket = require('unit-http/socket'); - -const { Socket } = unit_socket; +const unit_lib = require('./build/Release/unit-http'); +const Socket = require('./socket'); +const WebSocketFrame = require('./websocket_frame'); function ServerResponse(req) { EventEmitter.call(this); this.headers = {}; + + this.server = req.server; + this._request = req; + req._response = this; + this.socket = req.socket; + this.connection = req.connection; } util.inherits(ServerResponse, EventEmitter); @@ -207,15 +212,23 @@ ServerResponse.prototype._implicitHeader = function _implicitHeader() { this.writeHead(this.statusCode); }; -ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { - var contentLength = 0; +ServerResponse.prototype._send_headers = unit_lib.response_send_headers; +ServerResponse.prototype._sendHeaders = function _sendHeaders() { if (!this.headersSent) { - unit_lib.unit_response_headers(this, this.statusCode, this.headers, - this.headers_count, this.headers_len); + this._send_headers(this.statusCode, this.headers, this.headers_count, + this.headers_len); this.headersSent = true; } +}; + +ServerResponse.prototype._write = unit_lib.response_write; + +ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { + var contentLength = 0; + + this._sendHeaders(); if (typeof chunk === 'function') { callback = chunk; @@ -238,7 +251,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { contentLength = chunk.length; } - unit_lib.unit_response_write(this, chunk, contentLength); + this._write(chunk, contentLength); } if (typeof callback === 'function') { @@ -268,11 +281,13 @@ ServerResponse.prototype.write = function write(chunk, encoding, callback) { return true; }; +ServerResponse.prototype._end = unit_lib.response_end; + ServerResponse.prototype.end = function end(chunk, encoding, callback) { if (!this.finished) { this._writeBody(chunk, encoding, callback); - unit_lib.unit_response_end(this); + this._end(); this.finished = true; } @@ -280,10 +295,12 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) { return this; }; -function ServerRequest(server) { +function ServerRequest(server, socket) { EventEmitter.call(this); this.server = server; + this.socket = socket; + this.connection = socket; } util.inherits(ServerRequest, EventEmitter); @@ -339,8 +356,8 @@ ServerRequest.prototype.on = function on(ev, fn) { if (ev === "data") { process.nextTick(function () { - if (this.server.buffer.length !== 0) { - this.emit("data", this.server.buffer); + if (this._data.length !== 0) { + this.emit("data", this._data); } }.bind(this)); @@ -357,14 +374,27 @@ function Server(requestListener) { this.unit.createServer(); - this.socket = Socket; - this.request = ServerRequest; - this.response = ServerResponse; + this.Socket = Socket; + this.ServerRequest = ServerRequest; + this.ServerResponse = ServerResponse; + this.WebSocketFrame = WebSocketFrame; if (requestListener) { this.on('request', requestListener); } + + this._upgradeListenerCount = 0; + this.on('newListener', function(ev) { + if (ev === 'upgrade'){ + this._upgradeListenerCount++; + } + }).on('removeListener', function(ev) { + if (ev === 'upgrade') { + this._upgradeListenerCount--; + } + }); } + util.inherits(Server, EventEmitter); Server.prototype.setTimeout = function setTimeout(msecs, callback) { @@ -381,15 +411,13 @@ Server.prototype.listen = function () { this.unit.listen(); }; -Server.prototype.emit_events = function (server, req, res) { - req.server = server; - res.server = server; - req.res = res; - res.req = req; - - server.buffer = server.unit._read(req.socket.req_pointer); +Server.prototype.emit_request = function (req, res) { + if (req._websocket_handshake && this._upgradeListenerCount > 0) { + this.emit('upgrade', req, req.socket); - server.emit("request", req, res); + } else { + this.emit("request", req, res); + } process.nextTick(() => { req.emit("finish"); diff --git a/src/nodejs/unit-http/nxt_napi.h b/src/nodejs/unit-http/nxt_napi.h index 9bcf3a21..d9721a40 100644 --- a/src/nodejs/unit-http/nxt_napi.h +++ b/src/nodejs/unit-http/nxt_napi.h @@ -188,6 +188,21 @@ struct nxt_napi { } + inline void * + get_buffer_info(napi_value val, size_t &size) + { + void *res; + napi_status status; + + status = napi_get_buffer_info(env_, val, &res, &size); + if (status != napi_ok) { + throw exception("Failed to get buffer info"); + } + + return res; + } + + inline napi_value get_cb_info(napi_callback_info info, size_t &argc, napi_value *argv) { @@ -219,6 +234,23 @@ struct nxt_napi { inline napi_value + get_cb_info(napi_callback_info info, napi_value &arg) + { + size_t argc; + napi_value res; + + argc = 1; + res = get_cb_info(info, argc, &arg); + + if (argc != 1) { + throw exception("Wrong args count. Expected 1"); + } + + return res; + } + + + inline napi_value get_element(napi_value obj, uint32_t i) { napi_value res; @@ -311,15 +343,22 @@ struct nxt_napi { inline nxt_unit_request_info_t * get_request_info(napi_value obj) { - int64_t n; + return (nxt_unit_request_info_t *) unwrap(obj); + } + + + inline uint32_t + get_value_bool(napi_value obj) + { + bool res; napi_status status; - status = napi_get_value_int64(env_, obj, &n); + status = napi_get_value_bool(env_, obj, &res); if (status != napi_ok) { - throw exception("Failed to get request pointer"); + throw exception("Failed to get bool"); } - return (nxt_unit_request_info_t *) (intptr_t) n; + return res; } @@ -353,6 +392,21 @@ struct nxt_napi { } + inline size_t + get_value_string_utf8(napi_value val, char *buf, size_t bufsize) + { + size_t res; + napi_status status; + + status = napi_get_value_string_utf8(env_, val, buf, bufsize, &res); + if (status != napi_ok) { + throw exception("Failed to get string utf8"); + } + + return res; + } + + inline bool is_array(napi_value val) { @@ -368,6 +422,21 @@ struct nxt_napi { } + inline bool + is_buffer(napi_value val) + { + bool res; + napi_status status; + + status = napi_is_buffer(env_, val, &res); + if (status != napi_ok) { + throw exception("Failed to confirm value is buffer"); + } + + return res; + } + + inline napi_value make_callback(napi_async_context ctx, napi_value val, napi_value func, int argc, const napi_value *argv) @@ -398,6 +467,41 @@ struct nxt_napi { inline napi_value + make_callback(napi_async_context ctx, napi_value val, napi_value func) + { + return make_callback(ctx, val, func, 0, NULL); + } + + + inline napi_value + make_callback(napi_async_context ctx, napi_value val, napi_value func, + napi_value arg1) + { + return make_callback(ctx, val, func, 1, &arg1); + } + + + inline napi_value + make_callback(napi_async_context ctx, napi_value val, napi_value func, + napi_value arg1, napi_value arg2) + { + napi_value args[2] = { arg1, arg2 }; + + return make_callback(ctx, val, func, 2, args); + } + + + inline napi_value + make_callback(napi_async_context ctx, napi_value val, napi_value func, + napi_value arg1, napi_value arg2, napi_value arg3) + { + napi_value args[3] = { arg1, arg2, arg3 }; + + return make_callback(ctx, val, func, 3, args); + } + + + inline napi_value new_instance(napi_value ctor) { napi_value res; @@ -427,6 +531,22 @@ struct nxt_napi { } + inline napi_value + new_instance(napi_value ctor, napi_value param1, napi_value param2) + { + napi_value res; + napi_status status; + napi_value param[2] = { param1, param2 }; + + status = napi_new_instance(env_, ctor, 2, param, &res); + if (status != napi_ok) { + throw exception("Failed to create instance"); + } + + return res; + } + + inline void set_element(napi_value obj, uint32_t i, napi_value val) { @@ -472,8 +592,46 @@ struct nxt_napi { } + template<typename T> inline void - set_named_property(napi_value obj, const char *name, intptr_t val) + set_named_property(napi_value obj, const char *name, T val) + { + set_named_property(obj, name, create(val)); + } + + + inline napi_value + create(int32_t val) + { + napi_value ptr; + napi_status status; + + status = napi_create_int32(env_, val, &ptr); + if (status != napi_ok) { + throw exception("Failed to create int32"); + } + + return ptr; + } + + + inline napi_value + create(uint32_t val) + { + napi_value ptr; + napi_status status; + + status = napi_create_uint32(env_, val, &ptr); + if (status != napi_ok) { + throw exception("Failed to create uint32"); + } + + return ptr; + } + + + inline napi_value + create(int64_t val) { napi_value ptr; napi_status status; @@ -483,7 +641,32 @@ struct nxt_napi { throw exception("Failed to create int64"); } - set_named_property(obj, name, ptr); + return ptr; + } + + + inline void + remove_wrap(napi_ref& ref) + { + if (ref != nullptr) { + remove_wrap(get_reference_value(ref)); + ref = nullptr; + } + } + + + inline void * + remove_wrap(napi_value val) + { + void *res; + napi_status status; + + status = napi_remove_wrap(env_, val, &res); + if (status != napi_ok) { + throw exception("Failed to remove_wrap"); + } + + return res; } diff --git a/src/nodejs/unit-http/package.json b/src/nodejs/unit-http/package.json index 6a6c00b4..7ee01346 100644 --- a/src/nodejs/unit-http/package.json +++ b/src/nodejs/unit-http/package.json @@ -14,7 +14,14 @@ "package.json", "socket.js", "binding.gyp", - "README.md" + "README.md", + "websocket.js", + "websocket_connection.js", + "websocket_frame.js", + "websocket_request.js", + "websocket_router.js", + "websocket_router_request.js", + "websocket_server.js" ], "scripts": { "clean": "node-gyp clean", diff --git a/src/nodejs/unit-http/socket.js b/src/nodejs/unit-http/socket.js index 6e836949..b1a3abb8 100755..100644 --- a/src/nodejs/unit-http/socket.js +++ b/src/nodejs/unit-http/socket.js @@ -7,7 +7,7 @@ const EventEmitter = require('events'); const util = require('util'); -const unit_lib = require('unit-http/build/Release/unit-http.node'); +const unit_lib = require('./build/Release/unit-http'); function Socket(options) { EventEmitter.call(this); @@ -89,7 +89,7 @@ Socket.prototype.setTimeout = function setTimeout(timeout, callback) { this.timeout = timeout; - this.on('timeout', callback); + // this.on('timeout', callback); return this; }; @@ -101,6 +101,4 @@ Socket.prototype.write = function write(data, encoding, callback) { }; -module.exports = { - Socket -}; +module.exports = Socket; diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 3f66189a..ac10024c 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -10,6 +10,8 @@ #include <uv.h> +#include <nxt_unit_websocket.h> + napi_ref Unit::constructor_; @@ -20,17 +22,27 @@ struct nxt_nodejs_ctx_t { }; +struct req_data_t { + napi_ref sock_ref; + napi_ref resp_ref; + napi_ref conn_ref; +}; + + Unit::Unit(napi_env env, napi_value jsthis): nxt_napi(env), wrapper_(wrap(jsthis, this, destroy)), unit_ctx_(nullptr) { + nxt_unit_debug(NULL, "Unit::Unit()"); } Unit::~Unit() { delete_reference(wrapper_); + + nxt_unit_debug(NULL, "Unit::~Unit()"); } @@ -38,23 +50,26 @@ napi_value Unit::init(napi_env env, napi_value exports) { nxt_napi napi(env); - napi_value cons; + napi_value ctor; - napi_property_descriptor properties[] = { + napi_property_descriptor unit_props[] = { { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, - { "_read", 0, _read, 0, 0, 0, napi_default, 0 } }; try { - cons = napi.define_class("Unit", create, 3, properties); - constructor_ = napi.create_reference(cons); + ctor = napi.define_class("Unit", create, 2, unit_props); + constructor_ = napi.create_reference(ctor); - napi.set_named_property(exports, "Unit", cons); - napi.set_named_property(exports, "unit_response_headers", + napi.set_named_property(exports, "Unit", ctor); + napi.set_named_property(exports, "response_send_headers", response_send_headers); - napi.set_named_property(exports, "unit_response_write", response_write); - napi.set_named_property(exports, "unit_response_end", response_end); + napi.set_named_property(exports, "response_write", response_write); + napi.set_named_property(exports, "response_end", response_end); + napi.set_named_property(exports, "websocket_send_frame", + websocket_send_frame); + napi.set_named_property(exports, "websocket_set_sock", + websocket_set_sock); } catch (exception &e) { napi.throw_error(e); @@ -78,7 +93,7 @@ napi_value Unit::create(napi_env env, napi_callback_info info) { nxt_napi napi(env); - napi_value target, cons, instance, jsthis; + napi_value target, ctor, instance, jsthis; try { target = napi.get_new_target(info); @@ -94,8 +109,8 @@ Unit::create(napi_env env, napi_callback_info info) } /* Invoked as plain function `Unit(...)`, turn into construct call. */ - cons = napi.get_reference_value(constructor_); - instance = napi.new_instance(cons); + ctor = napi.get_reference_value(constructor_); + instance = napi.new_instance(ctor); napi.create_reference(instance); } catch (exception &e) { @@ -130,10 +145,14 @@ Unit::create_server(napi_env env, napi_callback_info info) memset(&unit_init, 0, sizeof(nxt_unit_init_t)); unit_init.data = obj; - unit_init.callbacks.request_handler = request_handler; - unit_init.callbacks.add_port = add_port; - unit_init.callbacks.remove_port = remove_port; - unit_init.callbacks.quit = quit; + unit_init.callbacks.request_handler = request_handler_cb; + unit_init.callbacks.websocket_handler = websocket_handler_cb; + unit_init.callbacks.close_handler = close_handler_cb; + unit_init.callbacks.add_port = add_port; + unit_init.callbacks.remove_port = remove_port; + unit_init.callbacks.quit = quit_cb; + + unit_init.request_data_size = sizeof(req_data_t); obj->unit_ctx_ = nxt_unit_init(&unit_init); if (obj->unit_ctx_ == NULL) { @@ -157,74 +176,139 @@ Unit::listen(napi_env env, napi_callback_info info) } -napi_value -Unit::_read(napi_env env, napi_callback_info info) +void +Unit::request_handler_cb(nxt_unit_request_info_t *req) { - void *data; - size_t argc; - nxt_napi napi(env); - napi_value buffer, argv; - nxt_unit_request_info_t *req; + Unit *obj; - argc = 1; + obj = reinterpret_cast<Unit *>(req->unit->data); + + obj->request_handler(req); +} + + +void +Unit::request_handler(nxt_unit_request_info_t *req) +{ + napi_value socket, request, response, server_obj, emit_request; + + memset(req->data, 0, sizeof(req_data_t)); try { - napi.get_cb_info(info, argc, &argv); + nxt_handle_scope scope(env()); + + server_obj = get_server_object(); + + socket = create_socket(server_obj, req); + request = create_request(server_obj, socket); + response = create_response(server_obj, request, req); + + create_headers(req, request); - req = napi.get_request_info(argv); - buffer = napi.create_buffer((size_t) req->content_length, &data); + emit_request = get_named_property(server_obj, "emit_request"); + + nxt_async_context async_context(env(), "request_handler"); + nxt_callback_scope async_scope(async_context); + + make_callback(async_context, server_obj, emit_request, request, + response); } catch (exception &e) { - napi.throw_error(e); - return nullptr; + nxt_unit_req_warn(req, "request_handler: %s", e.str); } +} - nxt_unit_request_read(req, data, req->content_length); - return buffer; +void +Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) +{ + Unit *obj; + + obj = reinterpret_cast<Unit *>(ws->req->unit->data); + + obj->websocket_handler(ws); } void -Unit::request_handler(nxt_unit_request_info_t *req) +Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) { - Unit *obj; - napi_value socket, request, response, server_obj; - napi_value emit_events; - napi_value events_args[3]; + napi_value frame, server_obj, process_frame, conn; + req_data_t *req_data; - obj = reinterpret_cast<Unit *>(req->unit->data); + req_data = (req_data_t *) ws->req->data; try { - nxt_handle_scope scope(obj->env()); - - server_obj = obj->get_server_object(); + nxt_handle_scope scope(env()); - socket = obj->create_socket(server_obj, req); - request = obj->create_request(server_obj, socket); - response = obj->create_response(server_obj, socket, request, req); + server_obj = get_server_object(); - obj->create_headers(req, request); + frame = create_websocket_frame(server_obj, ws); - emit_events = obj->get_named_property(server_obj, "emit_events"); + conn = get_reference_value(req_data->conn_ref); - events_args[0] = server_obj; - events_args[1] = request; - events_args[2] = response; + process_frame = get_named_property(conn, "processFrame"); - nxt_async_context async_context(obj->env(), "unit_request_handler"); + nxt_async_context async_context(env(), "websocket_handler"); nxt_callback_scope async_scope(async_context); - obj->make_callback(async_context, server_obj, emit_events, - 3, events_args); + make_callback(async_context, conn, process_frame, frame); } catch (exception &e) { - obj->throw_error(e); + nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); } + + nxt_unit_websocket_done(ws); +} + + +void +Unit::close_handler_cb(nxt_unit_request_info_t *req) +{ + Unit *obj; + + obj = reinterpret_cast<Unit *>(req->unit->data); + + obj->close_handler(req); } void +Unit::close_handler(nxt_unit_request_info_t *req) +{ + napi_value conn_handle_close, conn; + req_data_t *req_data; + + req_data = (req_data_t *) req->data; + + try { + nxt_handle_scope scope(env()); + + conn = get_reference_value(req_data->conn_ref); + + conn_handle_close = get_named_property(conn, "handleSocketClose"); + + nxt_async_context async_context(env(), "close_handler"); + nxt_callback_scope async_scope(async_context); + + make_callback(async_context, conn, conn_handle_close, + nxt_napi::create(0)); + + remove_wrap(req_data->sock_ref); + remove_wrap(req_data->resp_ref); + remove_wrap(req_data->conn_ref); + + } catch (exception &e) { + nxt_unit_req_warn(req, "close_handler: %s", e.str); + + return; + } + + nxt_unit_request_done(req, NXT_UNIT_OK); +} + + +static void nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); @@ -244,14 +328,14 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) obj = reinterpret_cast<Unit *>(ctx->unit->data); if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { - obj->throw_error("Failed to upgrade read" - " file descriptor to O_NONBLOCK"); + nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", + port->in_fd, strerror(errno), errno); return -1; } status = napi_get_uv_event_loop(obj->env(), &loop); if (status != napi_ok) { - obj->throw_error("Failed to get uv.loop"); + nxt_unit_warn(ctx, "Failed to get uv.loop"); return NXT_UNIT_ERROR; } @@ -259,13 +343,13 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); if (err < 0) { - obj->throw_error("Failed to init uv.poll"); + nxt_unit_warn(ctx, "Failed to init uv.poll"); return NXT_UNIT_ERROR; } err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); if (err < 0) { - obj->throw_error("Failed to start uv.poll"); + nxt_unit_warn(ctx, "Failed to start uv.poll"); return NXT_UNIT_ERROR; } @@ -308,27 +392,35 @@ Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) void -Unit::quit(nxt_unit_ctx_t *ctx) +Unit::quit_cb(nxt_unit_ctx_t *ctx) { - Unit *obj; - napi_value server_obj, emit_close; + Unit *obj; obj = reinterpret_cast<Unit *>(ctx->unit->data); + obj->quit(ctx); +} + + +void +Unit::quit(nxt_unit_ctx_t *ctx) +{ + napi_value server_obj, emit_close; + try { - nxt_handle_scope scope(obj->env()); + nxt_handle_scope scope(env()); - server_obj = obj->get_server_object(); + server_obj = get_server_object(); - emit_close = obj->get_named_property(server_obj, "emit_close"); + emit_close = get_named_property(server_obj, "emit_close"); - nxt_async_context async_context(obj->env(), "unit_quit"); + nxt_async_context async_context(env(), "unit_quit"); nxt_callback_scope async_scope(async_context); - obj->make_callback(async_context, server_obj, emit_close, 0, NULL); + make_callback(async_context, server_obj, emit_close); } catch (exception &e) { - obj->throw_error(e); + nxt_unit_debug(ctx, "quit: %s", e.str); } nxt_unit_done(ctx); @@ -349,8 +441,9 @@ Unit::get_server_object() void Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) { + void *data; uint32_t i; - napi_value headers, raw_headers; + napi_value headers, raw_headers, buffer; napi_status status; nxt_unit_request_t *r; @@ -373,6 +466,13 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) set_named_property(request, "httpVersion", r->version, r->version_length); set_named_property(request, "method", r->method, r->method_length); set_named_property(request, "url", r->target, r->target_length); + + set_named_property(request, "_websocket_handshake", r->websocket_handshake); + + buffer = create_buffer((size_t) req->content_length, &data); + nxt_unit_request_read(req, data, req->content_length); + + set_named_property(request, "_data", buffer); } @@ -410,15 +510,18 @@ napi_value Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) { napi_value constructor, res; + req_data_t *req_data; nxt_unit_request_t *r; r = req->request; - constructor = get_named_property(server_obj, "socket"); + constructor = get_named_property(server_obj, "Socket"); res = new_instance(constructor); - set_named_property(res, "req_pointer", (intptr_t) req); + req_data = (req_data_t *) req->data; + req_data->sock_ref = wrap(res, req, sock_destroy); + set_named_property(res, "remoteAddress", r->remote, r->remote_length); set_named_property(res, "localAddress", r->local, r->local_length); @@ -429,34 +532,66 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) napi_value Unit::create_request(napi_value server_obj, napi_value socket) { - napi_value constructor, return_val; + napi_value constructor; - constructor = get_named_property(server_obj, "request"); + constructor = get_named_property(server_obj, "ServerRequest"); - return_val = new_instance(constructor, server_obj); + return new_instance(constructor, server_obj, socket); +} - set_named_property(return_val, "socket", socket); - set_named_property(return_val, "connection", socket); - return return_val; +napi_value +Unit::create_response(napi_value server_obj, napi_value request, + nxt_unit_request_info_t *req) +{ + napi_value constructor, res; + req_data_t *req_data; + + constructor = get_named_property(server_obj, "ServerResponse"); + + res = new_instance(constructor, request); + + req_data = (req_data_t *) req->data; + req_data->resp_ref = wrap(res, req, resp_destroy); + + return res; } napi_value -Unit::create_response(napi_value server_obj, napi_value socket, - napi_value request, nxt_unit_request_info_t *req) +Unit::create_websocket_frame(napi_value server_obj, + nxt_unit_websocket_frame_t *ws) { - napi_value constructor, return_val; + void *data; + napi_value constructor, res, buffer; + uint8_t sc[2]; + + constructor = get_named_property(server_obj, "WebSocketFrame"); - constructor = get_named_property(server_obj, "response"); + res = new_instance(constructor); - return_val = new_instance(constructor, request); + set_named_property(res, "fin", (bool) ws->header->fin); + set_named_property(res, "opcode", ws->header->opcode); + set_named_property(res, "length", (int64_t) ws->payload_len); - set_named_property(return_val, "socket", socket); - set_named_property(return_val, "connection", socket); - set_named_property(return_val, "_req_point", (intptr_t) req); + if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { + if (ws->payload_len >= 2) { + nxt_unit_websocket_read(ws, sc, 2); - return return_val; + set_named_property(res, "closeStatus", + (((uint16_t) sc[0]) << 8) | sc[1]); + + } else { + set_named_property(res, "closeStatus", -1); + } + } + + buffer = create_buffer((size_t) ws->content_length, &data); + nxt_unit_websocket_read(ws, data, ws->content_length); + + set_named_property(res, "binaryPayload", buffer); + + return res; } @@ -472,35 +607,32 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) uint16_t hash; nxt_napi napi(env); napi_value this_arg, headers, keys, name, value, array_val; - napi_value req_num, array_entry; + napi_value array_entry; napi_valuetype val_type; nxt_unit_field_t *f; nxt_unit_request_info_t *req; - napi_value argv[5]; + napi_value argv[4]; - argc = 5; + argc = 4; try { this_arg = napi.get_cb_info(info, argc, argv); - if (argc != 5) { + if (argc != 4) { napi.throw_error("Wrong args count. Expected: " "statusCode, headers, headers count, " "headers length"); return nullptr; } - req_num = napi.get_named_property(argv[0], "_req_point"); - - req = napi.get_request_info(req_num); - - status_code = napi.get_value_uint32(argv[1]); - keys_count = napi.get_value_uint32(argv[3]); - header_len = napi.get_value_uint32(argv[4]); + req = napi.get_request_info(this_arg); + status_code = napi.get_value_uint32(argv[0]); + keys_count = napi.get_value_uint32(argv[2]); + header_len = napi.get_value_uint32(argv[3]); /* Need to reserve extra byte for C-string 0-termination. */ header_len++; - headers = argv[2]; + headers = argv[1]; ret = nxt_unit_response_init(req, status_code, keys_count, header_len); if (ret != NXT_UNIT_OK) { @@ -611,65 +743,151 @@ napi_value Unit::response_write(napi_env env, napi_callback_info info) { int ret; - char *ptr; + void *ptr; size_t argc, have_buf_len; uint32_t buf_len; nxt_napi napi(env); - napi_value this_arg, req_num; - napi_status status; + napi_value this_arg; nxt_unit_buf_t *buf; napi_valuetype buf_type; nxt_unit_request_info_t *req; - napi_value argv[3]; + napi_value argv[2]; - argc = 3; + argc = 2; try { this_arg = napi.get_cb_info(info, argc, argv); - if (argc != 3) { + if (argc != 2) { throw exception("Wrong args count. Expected: " "chunk, chunk length"); } - req_num = napi.get_named_property(argv[0], "_req_point"); - req = napi.get_request_info(req_num); + req = napi.get_request_info(this_arg); + buf_type = napi.type_of(argv[0]); + buf_len = napi.get_value_uint32(argv[1]) + 1; + + buf = nxt_unit_response_buf_alloc(req, buf_len); + if (buf == NULL) { + throw exception("Failed to allocate response buffer"); + } + + if (buf_type == napi_string) { + /* TODO: will work only for utf8 content-type */ + + have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, + buf_len); - buf_len = napi.get_value_uint32(argv[2]); + } else { + ptr = napi.get_buffer_info(argv[0], have_buf_len); - buf_type = napi.type_of(argv[1]); + memcpy(buf->free, ptr, have_buf_len); + } + buf->free += have_buf_len; + + ret = nxt_unit_buf_send(buf); + if (ret != NXT_UNIT_OK) { + throw exception("Failed to send body buf"); + } } catch (exception &e) { napi.throw_error(e); return nullptr; } - buf_len++; + return this_arg; +} - buf = nxt_unit_response_buf_alloc(req, buf_len); - if (buf == NULL) { - goto failed; - } - if (buf_type == napi_string) { - /* TODO: will work only for utf8 content-type */ +napi_value +Unit::response_end(napi_env env, napi_callback_info info) +{ + nxt_napi napi(env); + napi_value this_arg; + req_data_t *req_data; + nxt_unit_request_info_t *req; + + try { + this_arg = napi.get_cb_info(info); + + req = napi.get_request_info(this_arg); - status = napi_get_value_string_utf8(env, argv[1], buf->free, - buf_len, &have_buf_len); + req_data = (req_data_t *) req->data; - } else { - status = napi_get_buffer_info(env, argv[1], (void **) &ptr, - &have_buf_len); + napi.remove_wrap(req_data->sock_ref); + napi.remove_wrap(req_data->resp_ref); + napi.remove_wrap(req_data->conn_ref); - memcpy(buf->free, ptr, have_buf_len); + } catch (exception &e) { + napi.throw_error(e); + return nullptr; } - if (status != napi_ok) { - goto failed; + nxt_unit_request_done(req, NXT_UNIT_OK); + + return this_arg; +} + + +napi_value +Unit::websocket_send_frame(napi_env env, napi_callback_info info) +{ + int ret, iovec_len; + bool fin; + size_t buf_len; + uint32_t opcode, sc; + nxt_napi napi(env); + napi_value this_arg, frame, payload; + nxt_unit_request_info_t *req; + char status_code[2]; + struct iovec iov[2]; + + iovec_len = 0; + + try { + this_arg = napi.get_cb_info(info, frame); + + req = napi.get_request_info(this_arg); + + opcode = napi.get_value_uint32(napi.get_named_property(frame, + "opcode")); + if (opcode == NXT_WEBSOCKET_OP_CLOSE) { + sc = napi.get_value_uint32(napi.get_named_property(frame, + "closeStatus")); + status_code[0] = (sc >> 8) & 0xFF; + status_code[1] = sc & 0xFF; + + iov[iovec_len].iov_base = status_code; + iov[iovec_len].iov_len = 2; + iovec_len++; + } + + try { + fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); + + } catch (exception &e) { + fin = true; + } + + payload = napi.get_named_property(frame, "binaryPayload"); + + if (napi.is_buffer(payload)) { + iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); + + } else { + buf_len = 0; + } + + } catch (exception &e) { + napi.throw_error(e); + return nullptr; } - buf->free += have_buf_len; + if (buf_len > 0) { + iov[iovec_len].iov_len = buf_len; + iovec_len++; + } - ret = nxt_unit_buf_send(buf); + ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); if (ret != NXT_UNIT_OK) { goto failed; } @@ -678,34 +896,65 @@ Unit::response_write(napi_env env, napi_callback_info info) failed: - napi.throw_error("Failed to write body"); + napi.throw_error("Failed to send frame"); return nullptr; } napi_value -Unit::response_end(napi_env env, napi_callback_info info) +Unit::websocket_set_sock(napi_env env, napi_callback_info info) { - size_t argc; nxt_napi napi(env); - napi_value resp, this_arg, req_num; + napi_value this_arg, sock; + req_data_t *req_data; nxt_unit_request_info_t *req; - argc = 1; - try { - this_arg = napi.get_cb_info(info, argc, &resp); + this_arg = napi.get_cb_info(info, sock); - req_num = napi.get_named_property(resp, "_req_point"); - req = napi.get_request_info(req_num); + req = napi.get_request_info(sock); + + req_data = (req_data_t *) req->data; + req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); } catch (exception &e) { napi.throw_error(e); return nullptr; } - nxt_unit_request_done(req, NXT_UNIT_OK); - return this_arg; } + + +void +Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) +{ + nxt_unit_request_info_t *req; + + req = (nxt_unit_request_info_t *) nativeObject; + + nxt_unit_warn(NULL, "conn_destroy: %p", req); +} + + +void +Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) +{ + nxt_unit_request_info_t *req; + + req = (nxt_unit_request_info_t *) nativeObject; + + nxt_unit_warn(NULL, "sock_destroy: %p", req); +} + + +void +Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) +{ + nxt_unit_request_info_t *req; + + req = (nxt_unit_request_info_t *) nativeObject; + + nxt_unit_warn(NULL, "resp_destroy: %p", req); +} diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index e76d805a..f5eaf9fd 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -19,14 +19,28 @@ private: static napi_value create(napi_env env, napi_callback_info info); static void destroy(napi_env env, void *nativeObject, void *finalize_hint); + static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint); + static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint); + static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint); static napi_value create_server(napi_env env, napi_callback_info info); static napi_value listen(napi_env env, napi_callback_info info); static napi_value _read(napi_env env, napi_callback_info info); - static void request_handler(nxt_unit_request_info_t *req); + + static void request_handler_cb(nxt_unit_request_info_t *req); + void request_handler(nxt_unit_request_info_t *req); + + static void websocket_handler_cb(nxt_unit_websocket_frame_t *ws); + void websocket_handler(nxt_unit_websocket_frame_t *ws); + + static void close_handler_cb(nxt_unit_request_info_t *req); + void close_handler(nxt_unit_request_info_t *req); + static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); - static void quit(nxt_unit_ctx_t *ctx); + + static void quit_cb(nxt_unit_ctx_t *ctx); + void quit(nxt_unit_ctx_t *ctx); napi_value get_server_object(); @@ -35,20 +49,25 @@ private: napi_value create_request(napi_value server_obj, napi_value socket); - napi_value create_response(napi_value server_obj, napi_value socket, - napi_value request, + napi_value create_response(napi_value server_obj, napi_value request, nxt_unit_request_info_t *req); + napi_value create_websocket_frame(napi_value server_obj, + nxt_unit_websocket_frame_t *ws); + static napi_value response_send_headers(napi_env env, napi_callback_info info); static napi_value response_write(napi_env env, napi_callback_info info); static napi_value response_end(napi_env env, napi_callback_info info); + static napi_value websocket_send_frame(napi_env env, + napi_callback_info info); + static napi_value websocket_set_sock(napi_env env, napi_callback_info info); void create_headers(nxt_unit_request_info_t *req, napi_value request); void append_header(nxt_unit_field_t *f, napi_value headers, - napi_value raw_headers, uint32_t idx); + napi_value raw_headers, uint32_t idx); static napi_ref constructor_; diff --git a/src/nodejs/unit-http/utils.js b/src/nodejs/unit-http/utils.js new file mode 100644 index 00000000..e1e51b0e --- /dev/null +++ b/src/nodejs/unit-http/utils.js @@ -0,0 +1,73 @@ +var noop = exports.noop = function(){}; + +exports.extend = function extend(dest, source) { + for (var prop in source) { + dest[prop] = source[prop]; + } +}; + +exports.eventEmitterListenerCount = + require('events').EventEmitter.listenerCount || + function(emitter, type) { return emitter.listeners(type).length; }; + +exports.bufferAllocUnsafe = Buffer.allocUnsafe ? + Buffer.allocUnsafe : + function oldBufferAllocUnsafe(size) { return new Buffer(size); }; + +exports.bufferFromString = Buffer.from ? + Buffer.from : + function oldBufferFromString(string, encoding) { + return new Buffer(string, encoding); + }; + +exports.BufferingLogger = function createBufferingLogger(identifier, uniqueID) { + try { + var logFunction = require('debug')(identifier); + } + catch(e) { + logFunction = noop; + logFunction.enabled = false; + } + + if (logFunction.enabled) { + var logger = new BufferingLogger(identifier, uniqueID, logFunction); + var debug = logger.log.bind(logger); + debug.printOutput = logger.printOutput.bind(logger); + debug.enabled = logFunction.enabled; + return debug; + } + logFunction.printOutput = noop; + return logFunction; +}; + +function BufferingLogger(identifier, uniqueID, logFunction) { + this.logFunction = logFunction; + this.identifier = identifier; + this.uniqueID = uniqueID; + this.buffer = []; +} + +BufferingLogger.prototype.log = function() { + this.buffer.push([ new Date(), Array.prototype.slice.call(arguments) ]); + return this; +}; + +BufferingLogger.prototype.clear = function() { + this.buffer = []; + return this; +}; + +BufferingLogger.prototype.printOutput = function(logFunction) { + if (!logFunction) { logFunction = this.logFunction; } + var uniqueID = this.uniqueID; + this.buffer.forEach(function(entry) { + var date = entry[0].toLocaleString(); + var args = entry[1].slice(); + var formatString = args[0]; + if (formatString !== (void 0) && formatString !== null) { + formatString = '%s - %s - ' + formatString.toString(); + args.splice(0, 1, formatString, date, uniqueID); + logFunction.apply(global, args); + } + }); +}; diff --git a/src/nodejs/unit-http/websocket.js b/src/nodejs/unit-http/websocket.js new file mode 100644 index 00000000..36d0e07a --- /dev/null +++ b/src/nodejs/unit-http/websocket.js @@ -0,0 +1,14 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +'use strict'; + +module.exports = { + 'server' : require('./websocket_server'), + 'router' : require('./websocket_router'), + 'frame' : require('./websocket_frame'), + 'request' : require('./websocket_request'), + 'connection' : require('./websocket_connection'), +}; diff --git a/src/nodejs/unit-http/websocket_connection.js b/src/nodejs/unit-http/websocket_connection.js new file mode 100644 index 00000000..4eccf6bf --- /dev/null +++ b/src/nodejs/unit-http/websocket_connection.js @@ -0,0 +1,683 @@ +/************************************************************************ + * Copyright 2010-2015 Brian McKelvey. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***********************************************************************/ + +var util = require('util'); +var utils = require('./utils'); +var unit_lib = require('./build/Release/unit-http'); +var EventEmitter = require('events').EventEmitter; +var WebSocketFrame = require('./websocket_frame'); +var bufferAllocUnsafe = utils.bufferAllocUnsafe; +var bufferFromString = utils.bufferFromString; + +// Connected, fully-open, ready to send and receive frames +const STATE_OPEN = 'open'; +// Received a close frame from the remote peer +const STATE_PEER_REQUESTED_CLOSE = 'peer_requested_close'; +// Sent close frame to remote peer. No further data can be sent. +const STATE_ENDING = 'ending'; +// Connection is fully closed. No further data can be sent or received. +const STATE_CLOSED = 'closed'; + +var idCounter = 0; + +function WebSocketConnection(socket, extensions, protocol, maskOutgoingPackets, config) { + this._debug = utils.BufferingLogger('websocket:connection', ++idCounter); + this._debug('constructor'); + + if (this._debug.enabled) { + instrumentSocketForDebugging(this, socket); + } + + // Superclass Constructor + EventEmitter.call(this); + + this._pingListenerCount = 0; + this.on('newListener', function(ev) { + if (ev === 'ping'){ + this._pingListenerCount++; + } + }).on('removeListener', function(ev) { + if (ev === 'ping') { + this._pingListenerCount--; + } + }); + + this.config = config; + this.socket = socket; + this.protocol = protocol; + this.extensions = extensions; + this.remoteAddress = socket.remoteAddress; + this.closeReasonCode = -1; + this.closeDescription = null; + this.closeEventEmitted = false; + + // We have to mask outgoing packets if we're acting as a WebSocket client. + this.maskOutgoingPackets = maskOutgoingPackets; + + this.fragmentationSize = 0; // data received so far... + this.frameQueue = []; + + // Various bits of connection state + this.connected = true; + this.state = STATE_OPEN; + this.waitingForCloseResponse = false; + // Received TCP FIN, socket's readable stream is finished. + this.receivedEnd = false; + + this.closeTimeout = this.config.closeTimeout; + this.assembleFragments = this.config.assembleFragments; + this.maxReceivedMessageSize = this.config.maxReceivedMessageSize; + + this.outputBufferFull = false; + this.inputPaused = false; + this._closeTimerHandler = this.handleCloseTimer.bind(this); + + // Disable nagle algorithm? + this.socket.setNoDelay(this.config.disableNagleAlgorithm); + + // Make sure there is no socket inactivity timeout + this.socket.setTimeout(0); + + // The HTTP Client seems to subscribe to socket error events + // and re-dispatch them in such a way that doesn't make sense + // for users of our client, so we want to make sure nobody + // else is listening for error events on the socket besides us. + this.socket.removeAllListeners('error'); + + this._set_sock(this.socket); +} + +WebSocketConnection.prototype._set_sock = unit_lib.websocket_set_sock; +WebSocketConnection.prototype._end = unit_lib.response_end; + +WebSocketConnection.CLOSE_REASON_NORMAL = 1000; +WebSocketConnection.CLOSE_REASON_GOING_AWAY = 1001; +WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR = 1002; +WebSocketConnection.CLOSE_REASON_UNPROCESSABLE_INPUT = 1003; +WebSocketConnection.CLOSE_REASON_RESERVED = 1004; // Reserved value. Undefined meaning. +WebSocketConnection.CLOSE_REASON_NOT_PROVIDED = 1005; // Not to be used on the wire +WebSocketConnection.CLOSE_REASON_ABNORMAL = 1006; // Not to be used on the wire +WebSocketConnection.CLOSE_REASON_INVALID_DATA = 1007; +WebSocketConnection.CLOSE_REASON_POLICY_VIOLATION = 1008; +WebSocketConnection.CLOSE_REASON_MESSAGE_TOO_BIG = 1009; +WebSocketConnection.CLOSE_REASON_EXTENSION_REQUIRED = 1010; +WebSocketConnection.CLOSE_REASON_INTERNAL_SERVER_ERROR = 1011; +WebSocketConnection.CLOSE_REASON_TLS_HANDSHAKE_FAILED = 1015; // Not to be used on the wire + +WebSocketConnection.CLOSE_DESCRIPTIONS = { + 1000: 'Normal connection closure', + 1001: 'Remote peer is going away', + 1002: 'Protocol error', + 1003: 'Unprocessable input', + 1004: 'Reserved', + 1005: 'Reason not provided', + 1006: 'Abnormal closure, no further detail available', + 1007: 'Invalid data received', + 1008: 'Policy violation', + 1009: 'Message too big', + 1010: 'Extension requested by client is required', + 1011: 'Internal Server Error', + 1015: 'TLS Handshake Failed' +}; + +function validateCloseReason(code) { + if (code < 1000) { + // Status codes in the range 0-999 are not used + return false; + } + if (code >= 1000 && code <= 2999) { + // Codes from 1000 - 2999 are reserved for use by the protocol. Only + // a few codes are defined, all others are currently illegal. + return [1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014].indexOf(code) !== -1; + } + if (code >= 3000 && code <= 3999) { + // Reserved for use by libraries, frameworks, and applications. + // Should be registered with IANA. Interpretation of these codes is + // undefined by the WebSocket protocol. + return true; + } + if (code >= 4000 && code <= 4999) { + // Reserved for private use. Interpretation of these codes is + // undefined by the WebSocket protocol. + return true; + } + if (code >= 5000) { + return false; + } +} + +util.inherits(WebSocketConnection, EventEmitter); + +WebSocketConnection.prototype._addSocketEventListeners = function() { + this.socket.on('error', this.handleSocketError.bind(this)); + this.socket.on('end', this.handleSocketEnd.bind(this)); + this.socket.on('close', this.handleSocketClose.bind(this)); +}; + +WebSocketConnection.prototype.handleSocketError = function(error) { + this._debug('handleSocketError: %j', error); + if (this.state === STATE_CLOSED) { + // See https://github.com/theturtle32/WebSocket-Node/issues/288 + this._debug(' --- Socket \'error\' after \'close\''); + return; + } + this.closeReasonCode = WebSocketConnection.CLOSE_REASON_ABNORMAL; + this.closeDescription = 'Socket Error: ' + error.syscall + ' ' + error.code; + this.connected = false; + this.state = STATE_CLOSED; + this.fragmentationSize = 0; + if (utils.eventEmitterListenerCount(this, 'error') > 0) { + this.emit('error', error); + } + this.socket.destroy(error); + this._debug.printOutput(); + + this._end(); +}; + +WebSocketConnection.prototype.handleSocketEnd = function() { + this._debug('handleSocketEnd: received socket end. state = %s', this.state); + this.receivedEnd = true; + if (this.state === STATE_CLOSED) { + // When using the TLS module, sometimes the socket will emit 'end' + // after it emits 'close'. I don't think that's correct behavior, + // but we should deal with it gracefully by ignoring it. + this._debug(' --- Socket \'end\' after \'close\''); + return; + } + if (this.state !== STATE_PEER_REQUESTED_CLOSE && + this.state !== STATE_ENDING) { + this._debug(' --- UNEXPECTED socket end.'); + this.socket.end(); + + this._end(); + } +}; + +WebSocketConnection.prototype.handleSocketClose = function(hadError) { + this._debug('handleSocketClose: received socket close'); + this.socketHadError = hadError; + this.connected = false; + this.state = STATE_CLOSED; + // If closeReasonCode is still set to -1 at this point then we must + // not have received a close frame!! + if (this.closeReasonCode === -1) { + this.closeReasonCode = WebSocketConnection.CLOSE_REASON_ABNORMAL; + this.closeDescription = 'Connection dropped by remote peer.'; + } + this.clearCloseTimer(); + if (!this.closeEventEmitted) { + this.closeEventEmitted = true; + this._debug('-- Emitting WebSocketConnection close event'); + this.emit('close', this.closeReasonCode, this.closeDescription); + } +}; + +WebSocketConnection.prototype.close = function(reasonCode, description) { + if (this.connected) { + this._debug('close: Initating clean WebSocket close sequence.'); + if ('number' !== typeof reasonCode) { + reasonCode = WebSocketConnection.CLOSE_REASON_NORMAL; + } + if (!validateCloseReason(reasonCode)) { + throw new Error('Close code ' + reasonCode + ' is not valid.'); + } + if ('string' !== typeof description) { + description = WebSocketConnection.CLOSE_DESCRIPTIONS[reasonCode]; + } + this.closeReasonCode = reasonCode; + this.closeDescription = description; + this.setCloseTimer(); + this.sendCloseFrame(this.closeReasonCode, this.closeDescription); + this.state = STATE_ENDING; + this.connected = false; + } +}; + +WebSocketConnection.prototype.drop = function(reasonCode, description, skipCloseFrame) { + this._debug('drop'); + if (typeof(reasonCode) !== 'number') { + reasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR; + } + + if (typeof(description) !== 'string') { + // If no description is provided, try to look one up based on the + // specified reasonCode. + description = WebSocketConnection.CLOSE_DESCRIPTIONS[reasonCode]; + } + + this._debug('Forcefully dropping connection. skipCloseFrame: %s, code: %d, description: %s', + skipCloseFrame, reasonCode, description + ); + + this.closeReasonCode = reasonCode; + this.closeDescription = description; + this.frameQueue = []; + this.fragmentationSize = 0; + if (!skipCloseFrame) { + this.sendCloseFrame(reasonCode, description); + } + this.connected = false; + this.state = STATE_CLOSED; + this.clearCloseTimer(); + + if (!this.closeEventEmitted) { + this.closeEventEmitted = true; + this._debug('Emitting WebSocketConnection close event'); + this.emit('close', this.closeReasonCode, this.closeDescription); + } + + this._debug('Drop: destroying socket'); + this.socket.destroy(); + + this._end(); +}; + +WebSocketConnection.prototype.setCloseTimer = function() { + this._debug('setCloseTimer'); + this.clearCloseTimer(); + this._debug('Setting close timer'); + this.waitingForCloseResponse = true; + this.closeTimer = setTimeout(this._closeTimerHandler, this.closeTimeout); +}; + +WebSocketConnection.prototype.clearCloseTimer = function() { + this._debug('clearCloseTimer'); + if (this.closeTimer) { + this._debug('Clearing close timer'); + clearTimeout(this.closeTimer); + this.waitingForCloseResponse = false; + this.closeTimer = null; + } +}; + +WebSocketConnection.prototype.handleCloseTimer = function() { + this._debug('handleCloseTimer'); + this.closeTimer = null; + if (this.waitingForCloseResponse) { + this._debug('Close response not received from client. Forcing socket end.'); + this.waitingForCloseResponse = false; + this.state = STATE_CLOSED; + this.socket.end(); + + this._end(); + } +}; + +WebSocketConnection.prototype.processFrame = function(frame) { + if (!this.connected) { + return; + } + + this._debug('processFrame'); + this._debug(' -- frame: %s', frame); + + // Any non-control opcode besides 0x00 (continuation) received in the + // middle of a fragmented message is illegal. + if (this.frameQueue.length !== 0 && (frame.opcode > 0x00 && frame.opcode < 0x08)) { + this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, + 'Illegal frame opcode 0x' + frame.opcode.toString(16) + ' ' + + 'received in middle of fragmented message.'); + return; + } + + switch(frame.opcode) { + case 0x02: // WebSocketFrame.BINARY_FRAME + this._debug('-- Binary Frame'); + if (this.assembleFragments) { + if (frame.fin) { + // Complete single-frame message received + this._debug('---- Emitting \'message\' event'); + this.emit('message', { + type: 'binary', + binaryData: frame.binaryPayload + }); + } + else { + // beginning of a fragmented message + this.frameQueue.push(frame); + this.fragmentationSize = frame.length; + } + } + break; + case 0x01: // WebSocketFrame.TEXT_FRAME + this._debug('-- Text Frame'); + if (this.assembleFragments) { + if (frame.fin) { + // Complete single-frame message received + this._debug('---- Emitting \'message\' event'); + this.emit('message', { + type: 'utf8', + utf8Data: frame.binaryPayload.toString('utf8') + }); + } + else { + // beginning of a fragmented message + this.frameQueue.push(frame); + this.fragmentationSize = frame.length; + } + } + break; + case 0x00: // WebSocketFrame.CONTINUATION + this._debug('-- Continuation Frame'); + if (this.assembleFragments) { + if (this.frameQueue.length === 0) { + this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, + 'Unexpected Continuation Frame'); + return; + } + + this.fragmentationSize += frame.length; + + if (this.fragmentationSize > this.maxReceivedMessageSize) { + this.drop(WebSocketConnection.CLOSE_REASON_MESSAGE_TOO_BIG, + 'Maximum message size exceeded.'); + return; + } + + this.frameQueue.push(frame); + + if (frame.fin) { + // end of fragmented message, so we process the whole + // message now. We also have to decode the utf-8 data + // for text frames after combining all the fragments. + var bytesCopied = 0; + var binaryPayload = bufferAllocUnsafe(this.fragmentationSize); + var opcode = this.frameQueue[0].opcode; + this.frameQueue.forEach(function (currentFrame) { + currentFrame.binaryPayload.copy(binaryPayload, bytesCopied); + bytesCopied += currentFrame.binaryPayload.length; + }); + this.frameQueue = []; + this.fragmentationSize = 0; + + switch (opcode) { + case 0x02: // WebSocketOpcode.BINARY_FRAME + this.emit('message', { + type: 'binary', + binaryData: binaryPayload + }); + break; + case 0x01: // WebSocketOpcode.TEXT_FRAME + this.emit('message', { + type: 'utf8', + utf8Data: binaryPayload.toString('utf8') + }); + break; + default: + this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, + 'Unexpected first opcode in fragmentation sequence: 0x' + opcode.toString(16)); + return; + } + } + } + break; + case 0x09: // WebSocketFrame.PING + this._debug('-- Ping Frame'); + + if (this._pingListenerCount > 0) { + // logic to emit the ping frame: this is only done when a listener is known to exist + // Expose a function allowing the user to override the default ping() behavior + var cancelled = false; + var cancel = function() { + cancelled = true; + }; + this.emit('ping', cancel, frame.binaryPayload); + + // Only send a pong if the client did not indicate that he would like to cancel + if (!cancelled) { + this.pong(frame.binaryPayload); + } + } + else { + this.pong(frame.binaryPayload); + } + + break; + case 0x0A: // WebSocketFrame.PONG + this._debug('-- Pong Frame'); + this.emit('pong', frame.binaryPayload); + break; + case 0x08: // WebSocketFrame.CONNECTION_CLOSE + this._debug('-- Close Frame'); + if (this.waitingForCloseResponse) { + // Got response to our request to close the connection. + // Close is complete, so we just hang up. + this._debug('---- Got close response from peer. Completing closing handshake.'); + this.clearCloseTimer(); + this.waitingForCloseResponse = false; + this.state = STATE_CLOSED; + this.socket.end(); + + this._end(); + return; + } + + this._debug('---- Closing handshake initiated by peer.'); + // Got request from other party to close connection. + // Send back acknowledgement and then hang up. + this.state = STATE_PEER_REQUESTED_CLOSE; + var respondCloseReasonCode; + + // Make sure the close reason provided is legal according to + // the protocol spec. Providing no close status is legal. + // WebSocketFrame sets closeStatus to -1 by default, so if it + // is still -1, then no status was provided. + if (frame.invalidCloseFrameLength) { + this.closeReasonCode = 1005; // 1005 = No reason provided. + respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR; + } + else if (frame.closeStatus === -1 || validateCloseReason(frame.closeStatus)) { + this.closeReasonCode = frame.closeStatus; + respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_NORMAL; + } + else { + this.closeReasonCode = frame.closeStatus; + respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR; + } + + // If there is a textual description in the close frame, extract it. + if (frame.binaryPayload.length > 1) { + this.closeDescription = frame.binaryPayload.toString('utf8'); + } + else { + this.closeDescription = WebSocketConnection.CLOSE_DESCRIPTIONS[this.closeReasonCode]; + } + this._debug( + '------ Remote peer %s - code: %d - %s - close frame payload length: %d', + this.remoteAddress, this.closeReasonCode, + this.closeDescription, frame.length + ); + this._debug('------ responding to remote peer\'s close request.'); + this.drop(respondCloseReasonCode, null); + this.connected = false; + break; + default: + this._debug('-- Unrecognized Opcode %d', frame.opcode); + this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, + 'Unrecognized Opcode: 0x' + frame.opcode.toString(16)); + break; + } +}; + +WebSocketConnection.prototype.send = function(data, cb) { + this._debug('send'); + if (Buffer.isBuffer(data)) { + this.sendBytes(data, cb); + } + else if (typeof(data['toString']) === 'function') { + this.sendUTF(data, cb); + } + else { + throw new Error('Data provided must either be a Node Buffer or implement toString()'); + } +}; + +WebSocketConnection.prototype.sendUTF = function(data, cb) { + data = bufferFromString(data.toString(), 'utf8'); + this._debug('sendUTF: %d bytes', data.length); + + var frame = new WebSocketFrame(); + frame.opcode = 0x01; // WebSocketOpcode.TEXT_FRAME + frame.binaryPayload = data; + + this.fragmentAndSend(frame, cb); +}; + +WebSocketConnection.prototype.sendBytes = function(data, cb) { + this._debug('sendBytes'); + if (!Buffer.isBuffer(data)) { + throw new Error('You must pass a Node Buffer object to WebSocketConnection.prototype.sendBytes()'); + } + + var frame = new WebSocketFrame(); + frame.opcode = 0x02; // WebSocketOpcode.BINARY_FRAME + frame.binaryPayload = data; + + this.fragmentAndSend(frame, cb); +}; + +WebSocketConnection.prototype.ping = function(data) { + this._debug('ping'); + + var frame = new WebSocketFrame(); + frame.opcode = 0x09; // WebSocketOpcode.PING + frame.fin = true; + + if (data) { + if (!Buffer.isBuffer(data)) { + data = bufferFromString(data.toString(), 'utf8'); + } + if (data.length > 125) { + this._debug('WebSocket: Data for ping is longer than 125 bytes. Truncating.'); + data = data.slice(0,124); + } + frame.binaryPayload = data; + } + + this.sendFrame(frame); +}; + +// Pong frames have to echo back the contents of the data portion of the +// ping frame exactly, byte for byte. +WebSocketConnection.prototype.pong = function(binaryPayload) { + this._debug('pong'); + + var frame = new WebSocketFrame(); + frame.opcode = 0x0A; // WebSocketOpcode.PONG + if (Buffer.isBuffer(binaryPayload) && binaryPayload.length > 125) { + this._debug('WebSocket: Data for pong is longer than 125 bytes. Truncating.'); + binaryPayload = binaryPayload.slice(0,124); + } + frame.binaryPayload = binaryPayload; + frame.fin = true; + + this.sendFrame(frame); +}; + +WebSocketConnection.prototype.fragmentAndSend = function(frame, cb) { + this._debug('fragmentAndSend'); + if (frame.opcode > 0x07) { + throw new Error('You cannot fragment control frames.'); + } + + var threshold = this.config.fragmentationThreshold; + var length = frame.binaryPayload.length; + + // Send immediately if fragmentation is disabled or the message is not + // larger than the fragmentation threshold. + if (!this.config.fragmentOutgoingMessages || (frame.binaryPayload && length <= threshold)) { + frame.fin = true; + this.sendFrame(frame, cb); + return; + } + + var numFragments = Math.ceil(length / threshold); + var sentFragments = 0; + var sentCallback = function fragmentSentCallback(err) { + if (err) { + if (typeof cb === 'function') { + // pass only the first error + cb(err); + cb = null; + } + return; + } + ++sentFragments; + if ((sentFragments === numFragments) && (typeof cb === 'function')) { + cb(); + } + }; + for (var i=1; i <= numFragments; i++) { + var currentFrame = new WebSocketFrame(); + + // continuation opcode except for first frame. + currentFrame.opcode = (i === 1) ? frame.opcode : 0x00; + + // fin set on last frame only + currentFrame.fin = (i === numFragments); + + // length is likely to be shorter on the last fragment + var currentLength = (i === numFragments) ? length - (threshold * (i-1)) : threshold; + var sliceStart = threshold * (i-1); + + // Slice the right portion of the original payload + currentFrame.binaryPayload = frame.binaryPayload.slice(sliceStart, sliceStart + currentLength); + + this.sendFrame(currentFrame, sentCallback); + } +}; + +WebSocketConnection.prototype.sendCloseFrame = function(reasonCode, description, cb) { + if (typeof(reasonCode) !== 'number') { + reasonCode = WebSocketConnection.CLOSE_REASON_NORMAL; + } + + this._debug('sendCloseFrame state: %s, reasonCode: %d, description: %s', this.state, reasonCode, description); + + if (this.state !== STATE_OPEN && this.state !== STATE_PEER_REQUESTED_CLOSE) { return; } + + var frame = new WebSocketFrame(); + frame.fin = true; + frame.opcode = 0x08; // WebSocketOpcode.CONNECTION_CLOSE + frame.closeStatus = reasonCode; + if (typeof(description) === 'string') { + frame.binaryPayload = bufferFromString(description, 'utf8'); + } + + this.sendFrame(frame, cb); + this.socket.end(); +}; + +WebSocketConnection.prototype._send_frame = unit_lib.websocket_send_frame; + +WebSocketConnection.prototype.sendFrame = function(frame, cb) { + this._debug('sendFrame'); + + frame.mask = this.maskOutgoingPackets; + + this._send_frame(frame); + + if (typeof cb === 'function') { + cb(); + } + + var flushed = 0; // this.socket.write(frame.toBuffer(), cb); + this.outputBufferFull = !flushed; + return flushed; +}; + +module.exports = WebSocketConnection; diff --git a/src/nodejs/unit-http/websocket_frame.js b/src/nodejs/unit-http/websocket_frame.js new file mode 100644 index 00000000..9989937d --- /dev/null +++ b/src/nodejs/unit-http/websocket_frame.js @@ -0,0 +1,11 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +'use strict'; + +function WebSocketFrame() { +} + +module.exports = WebSocketFrame; diff --git a/src/nodejs/unit-http/websocket_request.js b/src/nodejs/unit-http/websocket_request.js new file mode 100644 index 00000000..d84e428b --- /dev/null +++ b/src/nodejs/unit-http/websocket_request.js @@ -0,0 +1,509 @@ +/************************************************************************ + * Copyright 2010-2015 Brian McKelvey. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***********************************************************************/ + +var util = require('util'); +var url = require('url'); +var EventEmitter = require('events').EventEmitter; +var WebSocketConnection = require('./websocket_connection'); + +var headerValueSplitRegExp = /,\s*/; +var headerParamSplitRegExp = /;\s*/; +var headerSanitizeRegExp = /[\r\n]/g; +var xForwardedForSeparatorRegExp = /,\s*/; +var separators = [ + '(', ')', '<', '>', '@', + ',', ';', ':', '\\', '\"', + '/', '[', ']', '?', '=', + '{', '}', ' ', String.fromCharCode(9) +]; +var controlChars = [String.fromCharCode(127) /* DEL */]; +for (var i=0; i < 31; i ++) { + /* US-ASCII Control Characters */ + controlChars.push(String.fromCharCode(i)); +} + +var cookieNameValidateRegEx = /([\x00-\x20\x22\x28\x29\x2c\x2f\x3a-\x3f\x40\x5b-\x5e\x7b\x7d\x7f])/; +var cookieValueValidateRegEx = /[^\x21\x23-\x2b\x2d-\x3a\x3c-\x5b\x5d-\x7e]/; +var cookieValueDQuoteValidateRegEx = /^"[^"]*"$/; +var controlCharsAndSemicolonRegEx = /[\x00-\x20\x3b]/g; + +var cookieSeparatorRegEx = /[;,] */; + +var httpStatusDescriptions = { + 100: 'Continue', + 101: 'Switching Protocols', + 200: 'OK', + 201: 'Created', + 203: 'Non-Authoritative Information', + 204: 'No Content', + 205: 'Reset Content', + 206: 'Partial Content', + 300: 'Multiple Choices', + 301: 'Moved Permanently', + 302: 'Found', + 303: 'See Other', + 304: 'Not Modified', + 305: 'Use Proxy', + 307: 'Temporary Redirect', + 400: 'Bad Request', + 401: 'Unauthorized', + 402: 'Payment Required', + 403: 'Forbidden', + 404: 'Not Found', + 406: 'Not Acceptable', + 407: 'Proxy Authorization Required', + 408: 'Request Timeout', + 409: 'Conflict', + 410: 'Gone', + 411: 'Length Required', + 412: 'Precondition Failed', + 413: 'Request Entity Too Long', + 414: 'Request-URI Too Long', + 415: 'Unsupported Media Type', + 416: 'Requested Range Not Satisfiable', + 417: 'Expectation Failed', + 426: 'Upgrade Required', + 500: 'Internal Server Error', + 501: 'Not Implemented', + 502: 'Bad Gateway', + 503: 'Service Unavailable', + 504: 'Gateway Timeout', + 505: 'HTTP Version Not Supported' +}; + +function WebSocketRequest(socket, httpRequest, serverConfig) { + // Superclass Constructor + EventEmitter.call(this); + + this.socket = socket; + this.httpRequest = httpRequest; + this.resource = httpRequest.url; + this.remoteAddress = socket.remoteAddress; + this.remoteAddresses = [this.remoteAddress]; + this.serverConfig = serverConfig; + + // Watch for the underlying TCP socket closing before we call accept + this._socketIsClosing = false; + this._socketCloseHandler = this._handleSocketCloseBeforeAccept.bind(this); + this.socket.on('end', this._socketCloseHandler); + this.socket.on('close', this._socketCloseHandler); + + this._resolved = false; +} + +util.inherits(WebSocketRequest, EventEmitter); + +WebSocketRequest.prototype.readHandshake = function() { + var self = this; + var request = this.httpRequest; + + // Decode URL + this.resourceURL = url.parse(this.resource, true); + + this.host = request.headers['host']; + if (!this.host) { + throw new Error('Client must provide a Host header.'); + } + + this.key = request.headers['sec-websocket-key']; + if (!this.key) { + throw new Error('Client must provide a value for Sec-WebSocket-Key.'); + } + + this.webSocketVersion = parseInt(request.headers['sec-websocket-version'], 10); + + if (!this.webSocketVersion || isNaN(this.webSocketVersion)) { + throw new Error('Client must provide a value for Sec-WebSocket-Version.'); + } + + switch (this.webSocketVersion) { + case 8: + case 13: + break; + default: + var e = new Error('Unsupported websocket client version: ' + this.webSocketVersion + + 'Only versions 8 and 13 are supported.'); + e.httpCode = 426; + e.headers = { + 'Sec-WebSocket-Version': '13' + }; + throw e; + } + + if (this.webSocketVersion === 13) { + this.origin = request.headers['origin']; + } + else if (this.webSocketVersion === 8) { + this.origin = request.headers['sec-websocket-origin']; + } + + // Protocol is optional. + var protocolString = request.headers['sec-websocket-protocol']; + this.protocolFullCaseMap = {}; + this.requestedProtocols = []; + if (protocolString) { + var requestedProtocolsFullCase = protocolString.split(headerValueSplitRegExp); + requestedProtocolsFullCase.forEach(function(protocol) { + var lcProtocol = protocol.toLocaleLowerCase(); + self.requestedProtocols.push(lcProtocol); + self.protocolFullCaseMap[lcProtocol] = protocol; + }); + } + + if (!this.serverConfig.ignoreXForwardedFor && + request.headers['x-forwarded-for']) { + var immediatePeerIP = this.remoteAddress; + this.remoteAddresses = request.headers['x-forwarded-for'] + .split(xForwardedForSeparatorRegExp); + this.remoteAddresses.push(immediatePeerIP); + this.remoteAddress = this.remoteAddresses[0]; + } + + // Extensions are optional. + var extensionsString = request.headers['sec-websocket-extensions']; + this.requestedExtensions = this.parseExtensions(extensionsString); + + // Cookies are optional + var cookieString = request.headers['cookie']; + this.cookies = this.parseCookies(cookieString); +}; + +WebSocketRequest.prototype.parseExtensions = function(extensionsString) { + if (!extensionsString || extensionsString.length === 0) { + return []; + } + var extensions = extensionsString.toLocaleLowerCase().split(headerValueSplitRegExp); + extensions.forEach(function(extension, index, array) { + var params = extension.split(headerParamSplitRegExp); + var extensionName = params[0]; + var extensionParams = params.slice(1); + extensionParams.forEach(function(rawParam, index, array) { + var arr = rawParam.split('='); + var obj = { + name: arr[0], + value: arr[1] + }; + array.splice(index, 1, obj); + }); + var obj = { + name: extensionName, + params: extensionParams + }; + array.splice(index, 1, obj); + }); + return extensions; +}; + +// This function adapted from node-cookie +// https://github.com/shtylman/node-cookie +WebSocketRequest.prototype.parseCookies = function(str) { + // Sanity Check + if (!str || typeof(str) !== 'string') { + return []; + } + + var cookies = []; + var pairs = str.split(cookieSeparatorRegEx); + + pairs.forEach(function(pair) { + var eq_idx = pair.indexOf('='); + if (eq_idx === -1) { + cookies.push({ + name: pair, + value: null + }); + return; + } + + var key = pair.substr(0, eq_idx).trim(); + var val = pair.substr(++eq_idx, pair.length).trim(); + + // quoted values + if ('"' === val[0]) { + val = val.slice(1, -1); + } + + cookies.push({ + name: key, + value: decodeURIComponent(val) + }); + }); + + return cookies; +}; + +WebSocketRequest.prototype.accept = function(acceptedProtocol, allowedOrigin, cookies) { + this._verifyResolution(); + + // TODO: Handle extensions + + var protocolFullCase; + + if (acceptedProtocol) { + protocolFullCase = this.protocolFullCaseMap[acceptedProtocol.toLocaleLowerCase()]; + if (typeof(protocolFullCase) === 'undefined') { + protocolFullCase = acceptedProtocol; + } + } + else { + protocolFullCase = acceptedProtocol; + } + this.protocolFullCaseMap = null; + + var response = this.httpRequest._response; + response.statusCode = 101; + + if (protocolFullCase) { + // validate protocol + for (var i=0; i < protocolFullCase.length; i++) { + var charCode = protocolFullCase.charCodeAt(i); + var character = protocolFullCase.charAt(i); + if (charCode < 0x21 || charCode > 0x7E || separators.indexOf(character) !== -1) { + this.reject(500); + throw new Error('Illegal character "' + String.fromCharCode(character) + '" in subprotocol.'); + } + } + if (this.requestedProtocols.indexOf(acceptedProtocol) === -1) { + this.reject(500); + throw new Error('Specified protocol was not requested by the client.'); + } + + protocolFullCase = protocolFullCase.replace(headerSanitizeRegExp, ''); + response += 'Sec-WebSocket-Protocol: ' + protocolFullCase + '\r\n'; + } + this.requestedProtocols = null; + + if (allowedOrigin) { + allowedOrigin = allowedOrigin.replace(headerSanitizeRegExp, ''); + if (this.webSocketVersion === 13) { + response.setHeader('Origin', allowedOrigin); + } + else if (this.webSocketVersion === 8) { + response.setHeader('Sec-WebSocket-Origin', allowedOrigin); + } + } + + if (cookies) { + if (!Array.isArray(cookies)) { + this.reject(500); + throw new Error('Value supplied for "cookies" argument must be an array.'); + } + var seenCookies = {}; + cookies.forEach(function(cookie) { + if (!cookie.name || !cookie.value) { + this.reject(500); + throw new Error('Each cookie to set must at least provide a "name" and "value"'); + } + + // Make sure there are no \r\n sequences inserted + cookie.name = cookie.name.replace(controlCharsAndSemicolonRegEx, ''); + cookie.value = cookie.value.replace(controlCharsAndSemicolonRegEx, ''); + + if (seenCookies[cookie.name]) { + this.reject(500); + throw new Error('You may not specify the same cookie name twice.'); + } + seenCookies[cookie.name] = true; + + // token (RFC 2616, Section 2.2) + var invalidChar = cookie.name.match(cookieNameValidateRegEx); + if (invalidChar) { + this.reject(500); + throw new Error('Illegal character ' + invalidChar[0] + ' in cookie name'); + } + + // RFC 6265, Section 4.1.1 + // *cookie-octet / ( DQUOTE *cookie-octet DQUOTE ) | %x21 / %x23-2B / %x2D-3A / %x3C-5B / %x5D-7E + if (cookie.value.match(cookieValueDQuoteValidateRegEx)) { + invalidChar = cookie.value.slice(1, -1).match(cookieValueValidateRegEx); + } else { + invalidChar = cookie.value.match(cookieValueValidateRegEx); + } + if (invalidChar) { + this.reject(500); + throw new Error('Illegal character ' + invalidChar[0] + ' in cookie value'); + } + + var cookieParts = [cookie.name + '=' + cookie.value]; + + // RFC 6265, Section 4.1.1 + // 'Path=' path-value | <any CHAR except CTLs or ';'> + if(cookie.path){ + invalidChar = cookie.path.match(controlCharsAndSemicolonRegEx); + if (invalidChar) { + this.reject(500); + throw new Error('Illegal character ' + invalidChar[0] + ' in cookie path'); + } + cookieParts.push('Path=' + cookie.path); + } + + // RFC 6265, Section 4.1.2.3 + // 'Domain=' subdomain + if (cookie.domain) { + if (typeof(cookie.domain) !== 'string') { + this.reject(500); + throw new Error('Domain must be specified and must be a string.'); + } + invalidChar = cookie.domain.match(controlCharsAndSemicolonRegEx); + if (invalidChar) { + this.reject(500); + throw new Error('Illegal character ' + invalidChar[0] + ' in cookie domain'); + } + cookieParts.push('Domain=' + cookie.domain.toLowerCase()); + } + + // RFC 6265, Section 4.1.1 + //'Expires=' sane-cookie-date | Force Date object requirement by using only epoch + if (cookie.expires) { + if (!(cookie.expires instanceof Date)){ + this.reject(500); + throw new Error('Value supplied for cookie "expires" must be a vaild date object'); + } + cookieParts.push('Expires=' + cookie.expires.toGMTString()); + } + + // RFC 6265, Section 4.1.1 + //'Max-Age=' non-zero-digit *DIGIT + if (cookie.maxage) { + var maxage = cookie.maxage; + if (typeof(maxage) === 'string') { + maxage = parseInt(maxage, 10); + } + if (isNaN(maxage) || maxage <= 0 ) { + this.reject(500); + throw new Error('Value supplied for cookie "maxage" must be a non-zero number'); + } + maxage = Math.round(maxage); + cookieParts.push('Max-Age=' + maxage.toString(10)); + } + + // RFC 6265, Section 4.1.1 + //'Secure;' + if (cookie.secure) { + if (typeof(cookie.secure) !== 'boolean') { + this.reject(500); + throw new Error('Value supplied for cookie "secure" must be of type boolean'); + } + cookieParts.push('Secure'); + } + + // RFC 6265, Section 4.1.1 + //'HttpOnly;' + if (cookie.httponly) { + if (typeof(cookie.httponly) !== 'boolean') { + this.reject(500); + throw new Error('Value supplied for cookie "httponly" must be of type boolean'); + } + cookieParts.push('HttpOnly'); + } + + response.addHeader('Set-Cookie', cookieParts.join(';')); + }.bind(this)); + } + + // TODO: handle negotiated extensions + // if (negotiatedExtensions) { + // response += 'Sec-WebSocket-Extensions: ' + negotiatedExtensions.join(', ') + '\r\n'; + // } + + // Mark the request resolved now so that the user can't call accept or + // reject a second time. + this._resolved = true; + this.emit('requestResolved', this); + + var connection = new WebSocketConnection(this.socket, [], acceptedProtocol, false, this.serverConfig); + connection.webSocketVersion = this.webSocketVersion; + connection.remoteAddress = this.remoteAddress; + connection.remoteAddresses = this.remoteAddresses; + + var self = this; + + if (this._socketIsClosing) { + // Handle case when the client hangs up before we get a chance to + // accept the connection and send our side of the opening handshake. + cleanupFailedConnection(connection); + + } else { + response._sendHeaders(); + connection._addSocketEventListeners(); + } + + this.emit('requestAccepted', connection); + return connection; +}; + +WebSocketRequest.prototype.reject = function(status, reason, extraHeaders) { + this._verifyResolution(); + + // Mark the request resolved now so that the user can't call accept or + // reject a second time. + this._resolved = true; + this.emit('requestResolved', this); + + if (typeof(status) !== 'number') { + status = 403; + } + + var response = this.httpRequest._response; + + response.statusCode = status; + + if (reason) { + reason = reason.replace(headerSanitizeRegExp, ''); + response.addHeader('X-WebSocket-Reject-Reason', reason); + } + + if (extraHeaders) { + for (var key in extraHeaders) { + var sanitizedValue = extraHeaders[key].toString().replace(headerSanitizeRegExp, ''); + var sanitizedKey = key.replace(headerSanitizeRegExp, ''); + response += (sanitizedKey + ': ' + sanitizedValue + '\r\n'); + } + } + + response.end(); + + this.emit('requestRejected', this); +}; + +WebSocketRequest.prototype._handleSocketCloseBeforeAccept = function() { + this._socketIsClosing = true; + this._removeSocketCloseListeners(); +}; + +WebSocketRequest.prototype._removeSocketCloseListeners = function() { + this.socket.removeListener('end', this._socketCloseHandler); + this.socket.removeListener('close', this._socketCloseHandler); +}; + +WebSocketRequest.prototype._verifyResolution = function() { + if (this._resolved) { + throw new Error('WebSocketRequest may only be accepted or rejected one time.'); + } +}; + +function cleanupFailedConnection(connection) { + // Since we have to return a connection object even if the socket is + // already dead in order not to break the API, we schedule a 'close' + // event on the connection object to occur immediately. + process.nextTick(function() { + // WebSocketConnection.CLOSE_REASON_ABNORMAL = 1006 + // Third param: Skip sending the close frame to a dead socket + connection.drop(1006, 'TCP connection lost before handshake completed.', true); + }); +} + +module.exports = WebSocketRequest; diff --git a/src/nodejs/unit-http/websocket_router.js b/src/nodejs/unit-http/websocket_router.js new file mode 100644 index 00000000..4efa35d2 --- /dev/null +++ b/src/nodejs/unit-http/websocket_router.js @@ -0,0 +1,157 @@ +/************************************************************************ + * Copyright 2010-2015 Brian McKelvey. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***********************************************************************/ + +var extend = require('./utils').extend; +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var WebSocketRouterRequest = require('./websocket_router_request'); + +function WebSocketRouter(config) { + // Superclass Constructor + EventEmitter.call(this); + + this.config = { + // The WebSocketServer instance to attach to. + server: null + }; + if (config) { + extend(this.config, config); + } + this.handlers = []; + + this._requestHandler = this.handleRequest.bind(this); + if (this.config.server) { + this.attachServer(this.config.server); + } +} + +util.inherits(WebSocketRouter, EventEmitter); + +WebSocketRouter.prototype.attachServer = function(server) { + if (server) { + this.server = server; + this.server.on('request', this._requestHandler); + } + else { + throw new Error('You must specify a WebSocketServer instance to attach to.'); + } +}; + +WebSocketRouter.prototype.detachServer = function() { + if (this.server) { + this.server.removeListener('request', this._requestHandler); + this.server = null; + } + else { + throw new Error('Cannot detach from server: not attached.'); + } +}; + +WebSocketRouter.prototype.mount = function(path, protocol, callback) { + if (!path) { + throw new Error('You must specify a path for this handler.'); + } + if (!protocol) { + protocol = '____no_protocol____'; + } + if (!callback) { + throw new Error('You must specify a callback for this handler.'); + } + + path = this.pathToRegExp(path); + if (!(path instanceof RegExp)) { + throw new Error('Path must be specified as either a string or a RegExp.'); + } + var pathString = path.toString(); + + // normalize protocol to lower-case + protocol = protocol.toLocaleLowerCase(); + + if (this.findHandlerIndex(pathString, protocol) !== -1) { + throw new Error('You may only mount one handler per path/protocol combination.'); + } + + this.handlers.push({ + 'path': path, + 'pathString': pathString, + 'protocol': protocol, + 'callback': callback + }); +}; +WebSocketRouter.prototype.unmount = function(path, protocol) { + var index = this.findHandlerIndex(this.pathToRegExp(path).toString(), protocol); + if (index !== -1) { + this.handlers.splice(index, 1); + } + else { + throw new Error('Unable to find a route matching the specified path and protocol.'); + } +}; + +WebSocketRouter.prototype.findHandlerIndex = function(pathString, protocol) { + protocol = protocol.toLocaleLowerCase(); + for (var i=0, len=this.handlers.length; i < len; i++) { + var handler = this.handlers[i]; + if (handler.pathString === pathString && handler.protocol === protocol) { + return i; + } + } + return -1; +}; + +WebSocketRouter.prototype.pathToRegExp = function(path) { + if (typeof(path) === 'string') { + if (path === '*') { + path = /^.*$/; + } + else { + path = path.replace(/[-[\]{}()*+?.,\\^$|#\s]/g, '\\$&'); + path = new RegExp('^' + path + '$'); + } + } + return path; +}; + +WebSocketRouter.prototype.handleRequest = function(request) { + var requestedProtocols = request.requestedProtocols; + if (requestedProtocols.length === 0) { + requestedProtocols = ['____no_protocol____']; + } + + // Find a handler with the first requested protocol first + for (var i=0; i < requestedProtocols.length; i++) { + var requestedProtocol = requestedProtocols[i].toLocaleLowerCase(); + + // find the first handler that can process this request + for (var j=0, len=this.handlers.length; j < len; j++) { + var handler = this.handlers[j]; + if (handler.path.test(request.resourceURL.pathname)) { + if (requestedProtocol === handler.protocol || + handler.protocol === '*') + { + var routerRequest = new WebSocketRouterRequest(request, requestedProtocol); + handler.callback(routerRequest); + return; + } + } + } + } + + // If we get here we were unable to find a suitable handler. + request.reject(404, 'No handler is available for the given request.'); +}; + +module.exports = WebSocketRouter; diff --git a/src/nodejs/unit-http/websocket_router_request.js b/src/nodejs/unit-http/websocket_router_request.js new file mode 100644 index 00000000..d3e37457 --- /dev/null +++ b/src/nodejs/unit-http/websocket_router_request.js @@ -0,0 +1,54 @@ +/************************************************************************ + * Copyright 2010-2015 Brian McKelvey. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***********************************************************************/ + +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +function WebSocketRouterRequest(webSocketRequest, resolvedProtocol) { + // Superclass Constructor + EventEmitter.call(this); + + this.webSocketRequest = webSocketRequest; + if (resolvedProtocol === '____no_protocol____') { + this.protocol = null; + } + else { + this.protocol = resolvedProtocol; + } + this.origin = webSocketRequest.origin; + this.resource = webSocketRequest.resource; + this.resourceURL = webSocketRequest.resourceURL; + this.httpRequest = webSocketRequest.httpRequest; + this.remoteAddress = webSocketRequest.remoteAddress; + this.webSocketVersion = webSocketRequest.webSocketVersion; + this.requestedExtensions = webSocketRequest.requestedExtensions; + this.cookies = webSocketRequest.cookies; +} + +util.inherits(WebSocketRouterRequest, EventEmitter); + +WebSocketRouterRequest.prototype.accept = function(origin, cookies) { + var connection = this.webSocketRequest.accept(this.protocol, origin, cookies); + this.emit('requestAccepted', connection); + return connection; +}; + +WebSocketRouterRequest.prototype.reject = function(status, reason, extraHeaders) { + this.webSocketRequest.reject(status, reason, extraHeaders); + this.emit('requestRejected', this); +}; + +module.exports = WebSocketRouterRequest; diff --git a/src/nodejs/unit-http/websocket_server.js b/src/nodejs/unit-http/websocket_server.js new file mode 100644 index 00000000..306f3f67 --- /dev/null +++ b/src/nodejs/unit-http/websocket_server.js @@ -0,0 +1,213 @@ +/************************************************************************ + * Copyright 2010-2015 Brian McKelvey. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***********************************************************************/ + +var extend = require('./utils').extend; +var utils = require('./utils'); +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var WebSocketRequest = require('./websocket_request'); + +var WebSocketServer = function WebSocketServer(config) { + // Superclass Constructor + EventEmitter.call(this); + + this._handlers = { + upgrade: this.handleUpgrade.bind(this), + requestAccepted: this.handleRequestAccepted.bind(this), + requestResolved: this.handleRequestResolved.bind(this) + }; + this.connections = []; + this.pendingRequests = []; + if (config) { + this.mount(config); + } +}; + +util.inherits(WebSocketServer, EventEmitter); + +WebSocketServer.prototype.mount = function(config) { + this.config = { + // The http server instance to attach to. Required. + httpServer: null, + + // 64KiB max frame size. + maxReceivedFrameSize: 0x10000, + + // 1MiB max message size, only applicable if + // assembleFragments is true + maxReceivedMessageSize: 0x100000, + + // Outgoing messages larger than fragmentationThreshold will be + // split into multiple fragments. + fragmentOutgoingMessages: true, + + // Outgoing frames are fragmented if they exceed this threshold. + // Default is 16KiB + fragmentationThreshold: 0x4000, + + // If true, fragmented messages will be automatically assembled + // and the full message will be emitted via a 'message' event. + // If false, each frame will be emitted via a 'frame' event and + // the application will be responsible for aggregating multiple + // fragmented frames. Single-frame messages will emit a 'message' + // event in addition to the 'frame' event. + // Most users will want to leave this set to 'true' + assembleFragments: true, + + // If this is true, websocket connections will be accepted + // regardless of the path and protocol specified by the client. + // The protocol accepted will be the first that was requested + // by the client. Clients from any origin will be accepted. + // This should only be used in the simplest of cases. You should + // probably leave this set to 'false' and inspect the request + // object to make sure it's acceptable before accepting it. + autoAcceptConnections: false, + + // Whether or not the X-Forwarded-For header should be respected. + // It's important to set this to 'true' when accepting connections + // from untrusted clients, as a malicious client could spoof its + // IP address by simply setting this header. It's meant to be added + // by a trusted proxy or other intermediary within your own + // infrastructure. + // See: http://en.wikipedia.org/wiki/X-Forwarded-For + ignoreXForwardedFor: false, + + // The Nagle Algorithm makes more efficient use of network resources + // by introducing a small delay before sending small packets so that + // multiple messages can be batched together before going onto the + // wire. This however comes at the cost of latency, so the default + // is to disable it. If you don't need low latency and are streaming + // lots of small messages, you can change this to 'false' + disableNagleAlgorithm: true, + + // The number of milliseconds to wait after sending a close frame + // for an acknowledgement to come back before giving up and just + // closing the socket. + closeTimeout: 5000 + }; + extend(this.config, config); + + if (this.config.httpServer) { + if (!Array.isArray(this.config.httpServer)) { + this.config.httpServer = [this.config.httpServer]; + } + var upgradeHandler = this._handlers.upgrade; + this.config.httpServer.forEach(function(httpServer) { + httpServer.on('upgrade', upgradeHandler); + }); + } + else { + throw new Error('You must specify an httpServer on which to mount the WebSocket server.'); + } +}; + +WebSocketServer.prototype.unmount = function() { + var upgradeHandler = this._handlers.upgrade; + this.config.httpServer.forEach(function(httpServer) { + httpServer.removeListener('upgrade', upgradeHandler); + }); +}; + +WebSocketServer.prototype.closeAllConnections = function() { + this.connections.forEach(function(connection) { + connection.close(); + }); + this.pendingRequests.forEach(function(request) { + process.nextTick(function() { + request.reject(503); // HTTP 503 Service Unavailable + }); + }); +}; + +WebSocketServer.prototype.broadcast = function(data) { + if (Buffer.isBuffer(data)) { + this.broadcastBytes(data); + } + else if (typeof(data.toString) === 'function') { + this.broadcastUTF(data); + } +}; + +WebSocketServer.prototype.broadcastUTF = function(utfData) { + this.connections.forEach(function(connection) { + connection.sendUTF(utfData); + }); +}; + +WebSocketServer.prototype.broadcastBytes = function(binaryData) { + this.connections.forEach(function(connection) { + connection.sendBytes(binaryData); + }); +}; + +WebSocketServer.prototype.shutDown = function() { + this.unmount(); + this.closeAllConnections(); +}; + +WebSocketServer.prototype.handleUpgrade = function(request, socket) { + var wsRequest = new WebSocketRequest(socket, request, this.config); + try { + wsRequest.readHandshake(); + } + catch(e) { + wsRequest.reject( + e.httpCode ? e.httpCode : 400, + e.message, + e.headers + ); + return; + } + + this.pendingRequests.push(wsRequest); + + wsRequest.once('requestAccepted', this._handlers.requestAccepted); + wsRequest.once('requestResolved', this._handlers.requestResolved); + + if (!this.config.autoAcceptConnections && utils.eventEmitterListenerCount(this, 'request') > 0) { + this.emit('request', wsRequest); + } + else if (this.config.autoAcceptConnections) { + wsRequest.accept(wsRequest.requestedProtocols[0], wsRequest.origin); + } + else { + wsRequest.reject(404, 'No handler is configured to accept the connection.'); + } +}; + +WebSocketServer.prototype.handleRequestAccepted = function(connection) { + var self = this; + connection.once('close', function(closeReason, description) { + self.handleConnectionClose(connection, closeReason, description); + }); + this.connections.push(connection); + this.emit('connect', connection); +}; + +WebSocketServer.prototype.handleConnectionClose = function(connection, closeReason, description) { + var index = this.connections.indexOf(connection); + if (index !== -1) { + this.connections.splice(index, 1); + } + this.emit('close', connection, closeReason, description); +}; + +WebSocketServer.prototype.handleRequestResolved = function(request) { + var index = this.pendingRequests.indexOf(request); + if (index !== -1) { this.pendingRequests.splice(index, 1); } +}; + +module.exports = WebSocketServer; |