diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:32:05 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:32:05 +0300 |
commit | e291841b3379f8787a10ad4f91e4aeae2ae323a4 (patch) | |
tree | 49a2f4629e5b8d6cd48f7436d7eeba4c99905675 /src/nodejs/unit-http/websocket_server.js | |
parent | e501c74ddceab86e48c031ca9b5e154f52dcdae0 (diff) | |
download | unit-e291841b3379f8787a10ad4f91e4aeae2ae323a4.tar.gz unit-e291841b3379f8787a10ad4f91e4aeae2ae323a4.tar.bz2 |
Node.js: introducing websocket support.
Diffstat (limited to '')
-rw-r--r-- | src/nodejs/unit-http/websocket_server.js | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/src/nodejs/unit-http/websocket_server.js b/src/nodejs/unit-http/websocket_server.js new file mode 100644 index 00000000..306f3f67 --- /dev/null +++ b/src/nodejs/unit-http/websocket_server.js @@ -0,0 +1,213 @@ +/************************************************************************ + * Copyright 2010-2015 Brian McKelvey. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***********************************************************************/ + +var extend = require('./utils').extend; +var utils = require('./utils'); +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var WebSocketRequest = require('./websocket_request'); + +var WebSocketServer = function WebSocketServer(config) { + // Superclass Constructor + EventEmitter.call(this); + + this._handlers = { + upgrade: this.handleUpgrade.bind(this), + requestAccepted: this.handleRequestAccepted.bind(this), + requestResolved: this.handleRequestResolved.bind(this) + }; + this.connections = []; + this.pendingRequests = []; + if (config) { + this.mount(config); + } +}; + +util.inherits(WebSocketServer, EventEmitter); + +WebSocketServer.prototype.mount = function(config) { + this.config = { + // The http server instance to attach to. Required. + httpServer: null, + + // 64KiB max frame size. + maxReceivedFrameSize: 0x10000, + + // 1MiB max message size, only applicable if + // assembleFragments is true + maxReceivedMessageSize: 0x100000, + + // Outgoing messages larger than fragmentationThreshold will be + // split into multiple fragments. + fragmentOutgoingMessages: true, + + // Outgoing frames are fragmented if they exceed this threshold. + // Default is 16KiB + fragmentationThreshold: 0x4000, + + // If true, fragmented messages will be automatically assembled + // and the full message will be emitted via a 'message' event. + // If false, each frame will be emitted via a 'frame' event and + // the application will be responsible for aggregating multiple + // fragmented frames. Single-frame messages will emit a 'message' + // event in addition to the 'frame' event. + // Most users will want to leave this set to 'true' + assembleFragments: true, + + // If this is true, websocket connections will be accepted + // regardless of the path and protocol specified by the client. + // The protocol accepted will be the first that was requested + // by the client. Clients from any origin will be accepted. + // This should only be used in the simplest of cases. You should + // probably leave this set to 'false' and inspect the request + // object to make sure it's acceptable before accepting it. + autoAcceptConnections: false, + + // Whether or not the X-Forwarded-For header should be respected. + // It's important to set this to 'true' when accepting connections + // from untrusted clients, as a malicious client could spoof its + // IP address by simply setting this header. It's meant to be added + // by a trusted proxy or other intermediary within your own + // infrastructure. + // See: http://en.wikipedia.org/wiki/X-Forwarded-For + ignoreXForwardedFor: false, + + // The Nagle Algorithm makes more efficient use of network resources + // by introducing a small delay before sending small packets so that + // multiple messages can be batched together before going onto the + // wire. This however comes at the cost of latency, so the default + // is to disable it. If you don't need low latency and are streaming + // lots of small messages, you can change this to 'false' + disableNagleAlgorithm: true, + + // The number of milliseconds to wait after sending a close frame + // for an acknowledgement to come back before giving up and just + // closing the socket. + closeTimeout: 5000 + }; + extend(this.config, config); + + if (this.config.httpServer) { + if (!Array.isArray(this.config.httpServer)) { + this.config.httpServer = [this.config.httpServer]; + } + var upgradeHandler = this._handlers.upgrade; + this.config.httpServer.forEach(function(httpServer) { + httpServer.on('upgrade', upgradeHandler); + }); + } + else { + throw new Error('You must specify an httpServer on which to mount the WebSocket server.'); + } +}; + +WebSocketServer.prototype.unmount = function() { + var upgradeHandler = this._handlers.upgrade; + this.config.httpServer.forEach(function(httpServer) { + httpServer.removeListener('upgrade', upgradeHandler); + }); +}; + +WebSocketServer.prototype.closeAllConnections = function() { + this.connections.forEach(function(connection) { + connection.close(); + }); + this.pendingRequests.forEach(function(request) { + process.nextTick(function() { + request.reject(503); // HTTP 503 Service Unavailable + }); + }); +}; + +WebSocketServer.prototype.broadcast = function(data) { + if (Buffer.isBuffer(data)) { + this.broadcastBytes(data); + } + else if (typeof(data.toString) === 'function') { + this.broadcastUTF(data); + } +}; + +WebSocketServer.prototype.broadcastUTF = function(utfData) { + this.connections.forEach(function(connection) { + connection.sendUTF(utfData); + }); +}; + +WebSocketServer.prototype.broadcastBytes = function(binaryData) { + this.connections.forEach(function(connection) { + connection.sendBytes(binaryData); + }); +}; + +WebSocketServer.prototype.shutDown = function() { + this.unmount(); + this.closeAllConnections(); +}; + +WebSocketServer.prototype.handleUpgrade = function(request, socket) { + var wsRequest = new WebSocketRequest(socket, request, this.config); + try { + wsRequest.readHandshake(); + } + catch(e) { + wsRequest.reject( + e.httpCode ? e.httpCode : 400, + e.message, + e.headers + ); + return; + } + + this.pendingRequests.push(wsRequest); + + wsRequest.once('requestAccepted', this._handlers.requestAccepted); + wsRequest.once('requestResolved', this._handlers.requestResolved); + + if (!this.config.autoAcceptConnections && utils.eventEmitterListenerCount(this, 'request') > 0) { + this.emit('request', wsRequest); + } + else if (this.config.autoAcceptConnections) { + wsRequest.accept(wsRequest.requestedProtocols[0], wsRequest.origin); + } + else { + wsRequest.reject(404, 'No handler is configured to accept the connection.'); + } +}; + +WebSocketServer.prototype.handleRequestAccepted = function(connection) { + var self = this; + connection.once('close', function(closeReason, description) { + self.handleConnectionClose(connection, closeReason, description); + }); + this.connections.push(connection); + this.emit('connect', connection); +}; + +WebSocketServer.prototype.handleConnectionClose = function(connection, closeReason, description) { + var index = this.connections.indexOf(connection); + if (index !== -1) { + this.connections.splice(index, 1); + } + this.emit('close', connection, closeReason, description); +}; + +WebSocketServer.prototype.handleRequestResolved = function(request) { + var index = this.pendingRequests.indexOf(request); + if (index !== -1) { this.pendingRequests.splice(index, 1); } +}; + +module.exports = WebSocketServer; |