diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-12-24 18:04:17 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-12-24 18:04:17 +0300 |
commit | 763bdff4018ec35de8383273d366160adebb6021 (patch) | |
tree | 69da5e8b14c436142a84717851edbd00d84b0575 /src/nodejs/unit-http/http_server.js | |
parent | df7caf465072e171f88358b9e69c65b76d8efd25 (diff) | |
download | unit-763bdff4018ec35de8383273d366160adebb6021.tar.gz unit-763bdff4018ec35de8383273d366160adebb6021.tar.bz2 |
Node.js: implementing output message drain using SHM_ACK feature.
ServerResponse.write() method tries to write data buffer using libunit
and stores buffers to write in a Server-wide output queue, which is
processed in response to SHM_ACK message from router.
As a side effect 'drain' event implemented and socket.writable flag
reflect current state.
Diffstat (limited to '')
-rw-r--r-- | src/nodejs/unit-http/http_server.js | 121 |
1 files changed, 111 insertions, 10 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index c42149a5..2f324329 100644 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -227,6 +227,7 @@ ServerResponse.prototype._write = unit_lib.response_write; ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { var contentLength = 0; + var res, o; this._sendHeaders(); @@ -247,11 +248,32 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { if (typeof chunk === 'string') { contentLength = Buffer.byteLength(chunk, encoding); + if (contentLength > unit_lib.buf_min) { + chunk = Buffer.from(chunk, encoding); + + contentLength = chunk.length; + } + } else { contentLength = chunk.length; } - this._write(chunk, contentLength); + if (this.server._output.length > 0 || !this.socket.writable) { + o = new BufferedOutput(this, 0, chunk, encoding, callback); + this.server._output.push(o); + + return false; + } + + res = this._write(chunk, 0, contentLength); + if (res < contentLength) { + this.socket.writable = false; + + o = new BufferedOutput(this, res, chunk, encoding, callback); + this.server._output.push(o); + + return false; + } } if (typeof callback === 'function') { @@ -265,29 +287,48 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { * the event loop. All callbacks passed to process.nextTick() * will be resolved before the event loop continues. */ - process.nextTick(function () { - callback(this); - }.bind(this)); + process.nextTick(callback); } + + return true; }; ServerResponse.prototype.write = function write(chunk, encoding, callback) { if (this.finished) { - throw new Error("Write after end"); - } + if (typeof encoding === 'function') { + callback = encoding; + encoding = null; + } - this._writeBody(chunk, encoding, callback); + var err = new Error("Write after end"); + process.nextTick(() => { + this.emit('error', err); - return true; + if (typeof callback === 'function') { + callback(err); + } + }) + } + + return this._writeBody(chunk, encoding, callback); }; ServerResponse.prototype._end = unit_lib.response_end; ServerResponse.prototype.end = function end(chunk, encoding, callback) { if (!this.finished) { - this._writeBody(chunk, encoding, callback); + if (typeof encoding === 'function') { + callback = encoding; + encoding = null; + } - this._end(); + this._writeBody(chunk, encoding, () => { + this._end(); + + if (typeof callback === 'function') { + callback(); + } + }); this.finished = true; } @@ -393,6 +434,9 @@ function Server(requestListener) { this._upgradeListenerCount--; } }); + + this._output = []; + this._drain_resp = new Set(); } util.inherits(Server, EventEmitter); @@ -429,6 +473,63 @@ Server.prototype.emit_close = function () { this.emit('close'); }; +Server.prototype.emit_drain = function () { + var res, o, l; + + if (this._output.length <= 0) { + return; + } + + while (this._output.length > 0) { + o = this._output[0]; + + if (typeof o.chunk === 'string') { + l = Buffer.byteLength(o.chunk, o.encoding); + + } else { + l = o.chunk.length; + } + + res = o.resp._write(o.chunk, o.offset, l); + + o.offset += res; + if (o.offset < l) { + return; + } + + this._drain_resp.add(o.resp); + + if (typeof o.callback === 'function') { + process.nextTick(o.callback); + } + + this._output.shift(); + } + + for (var resp of this._drain_resp) { + + if (resp.socket.writable) { + continue; + } + + resp.socket.writable = true; + + process.nextTick(() => { + resp.emit("drain"); + }); + } + + this._drain_resp.clear(); +}; + +function BufferedOutput(resp, offset, chunk, encoding, callback) { + this.resp = resp; + this.offset = offset; + this.chunk = chunk; + this.encoding = encoding; + this.callback = callback; +} + function connectionListener(socket) { } |