summaryrefslogtreecommitdiffhomepage
path: root/src/nodejs
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2019-12-24 18:04:17 +0300
committerMax Romanov <max.romanov@nginx.com>2019-12-24 18:04:17 +0300
commit763bdff4018ec35de8383273d366160adebb6021 (patch)
tree69da5e8b14c436142a84717851edbd00d84b0575 /src/nodejs
parentdf7caf465072e171f88358b9e69c65b76d8efd25 (diff)
downloadunit-763bdff4018ec35de8383273d366160adebb6021.tar.gz
unit-763bdff4018ec35de8383273d366160adebb6021.tar.bz2
Node.js: implementing output message drain using SHM_ACK feature.
ServerResponse.write() method tries to write data buffer using libunit and stores buffers to write in a Server-wide output queue, which is processed in response to SHM_ACK message from router. As a side effect 'drain' event implemented and socket.writable flag reflect current state.
Diffstat (limited to 'src/nodejs')
-rw-r--r--src/nodejs/unit-http/http_server.js121
-rw-r--r--src/nodejs/unit-http/unit.cpp90
-rw-r--r--src/nodejs/unit-http/unit.h3
3 files changed, 188 insertions, 26 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js
index c42149a5..2f324329 100644
--- a/src/nodejs/unit-http/http_server.js
+++ b/src/nodejs/unit-http/http_server.js
@@ -227,6 +227,7 @@ ServerResponse.prototype._write = unit_lib.response_write;
ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
var contentLength = 0;
+ var res, o;
this._sendHeaders();
@@ -247,11 +248,32 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
if (typeof chunk === 'string') {
contentLength = Buffer.byteLength(chunk, encoding);
+ if (contentLength > unit_lib.buf_min) {
+ chunk = Buffer.from(chunk, encoding);
+
+ contentLength = chunk.length;
+ }
+
} else {
contentLength = chunk.length;
}
- this._write(chunk, contentLength);
+ if (this.server._output.length > 0 || !this.socket.writable) {
+ o = new BufferedOutput(this, 0, chunk, encoding, callback);
+ this.server._output.push(o);
+
+ return false;
+ }
+
+ res = this._write(chunk, 0, contentLength);
+ if (res < contentLength) {
+ this.socket.writable = false;
+
+ o = new BufferedOutput(this, res, chunk, encoding, callback);
+ this.server._output.push(o);
+
+ return false;
+ }
}
if (typeof callback === 'function') {
@@ -265,29 +287,48 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
* the event loop. All callbacks passed to process.nextTick()
* will be resolved before the event loop continues.
*/
- process.nextTick(function () {
- callback(this);
- }.bind(this));
+ process.nextTick(callback);
}
+
+ return true;
};
ServerResponse.prototype.write = function write(chunk, encoding, callback) {
if (this.finished) {
- throw new Error("Write after end");
- }
+ if (typeof encoding === 'function') {
+ callback = encoding;
+ encoding = null;
+ }
- this._writeBody(chunk, encoding, callback);
+ var err = new Error("Write after end");
+ process.nextTick(() => {
+ this.emit('error', err);
- return true;
+ if (typeof callback === 'function') {
+ callback(err);
+ }
+ })
+ }
+
+ return this._writeBody(chunk, encoding, callback);
};
ServerResponse.prototype._end = unit_lib.response_end;
ServerResponse.prototype.end = function end(chunk, encoding, callback) {
if (!this.finished) {
- this._writeBody(chunk, encoding, callback);
+ if (typeof encoding === 'function') {
+ callback = encoding;
+ encoding = null;
+ }
- this._end();
+ this._writeBody(chunk, encoding, () => {
+ this._end();
+
+ if (typeof callback === 'function') {
+ callback();
+ }
+ });
this.finished = true;
}
@@ -393,6 +434,9 @@ function Server(requestListener) {
this._upgradeListenerCount--;
}
});
+
+ this._output = [];
+ this._drain_resp = new Set();
}
util.inherits(Server, EventEmitter);
@@ -429,6 +473,63 @@ Server.prototype.emit_close = function () {
this.emit('close');
};
+Server.prototype.emit_drain = function () {
+ var res, o, l;
+
+ if (this._output.length <= 0) {
+ return;
+ }
+
+ while (this._output.length > 0) {
+ o = this._output[0];
+
+ if (typeof o.chunk === 'string') {
+ l = Buffer.byteLength(o.chunk, o.encoding);
+
+ } else {
+ l = o.chunk.length;
+ }
+
+ res = o.resp._write(o.chunk, o.offset, l);
+
+ o.offset += res;
+ if (o.offset < l) {
+ return;
+ }
+
+ this._drain_resp.add(o.resp);
+
+ if (typeof o.callback === 'function') {
+ process.nextTick(o.callback);
+ }
+
+ this._output.shift();
+ }
+
+ for (var resp of this._drain_resp) {
+
+ if (resp.socket.writable) {
+ continue;
+ }
+
+ resp.socket.writable = true;
+
+ process.nextTick(() => {
+ resp.emit("drain");
+ });
+ }
+
+ this._drain_resp.clear();
+};
+
+function BufferedOutput(resp, offset, chunk, encoding, callback) {
+ this.resp = resp;
+ this.offset = offset;
+ this.chunk = chunk;
+ this.encoding = encoding;
+ this.callback = callback;
+}
+
function connectionListener(socket) {
}
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 10875703..1fa73689 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -70,6 +70,8 @@ Unit::init(napi_env env, napi_value exports)
websocket_send_frame);
napi.set_named_property(exports, "websocket_set_sock",
websocket_set_sock);
+ napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
+ napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
} catch (exception &e) {
napi.throw_error(e);
@@ -148,6 +150,7 @@ Unit::create_server(napi_env env, napi_callback_info info)
unit_init.callbacks.request_handler = request_handler_cb;
unit_init.callbacks.websocket_handler = websocket_handler_cb;
unit_init.callbacks.close_handler = close_handler_cb;
+ unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb;
unit_init.callbacks.add_port = add_port;
unit_init.callbacks.remove_port = remove_port;
unit_init.callbacks.quit = quit_cb;
@@ -308,6 +311,40 @@ Unit::close_handler(nxt_unit_request_info_t *req)
}
+void
+Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
+{
+ Unit *obj;
+
+ obj = reinterpret_cast<Unit *>(ctx->unit->data);
+
+ obj->shm_ack_handler(ctx);
+}
+
+
+void
+Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
+{
+ napi_value server_obj, emit_drain;
+
+ try {
+ nxt_handle_scope scope(env());
+
+ server_obj = get_server_object();
+
+ emit_drain = get_named_property(server_obj, "emit_drain");
+
+ nxt_async_context async_context(env(), "shm_ack_handler");
+ nxt_callback_scope async_scope(async_context);
+
+ make_callback(async_context, server_obj, emit_drain);
+
+ } catch (exception &e) {
+ nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
+ }
+}
+
+
static void
nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{
@@ -748,47 +785,68 @@ Unit::response_write(napi_env env, napi_callback_info info)
int ret;
void *ptr;
size_t argc, have_buf_len;
- uint32_t buf_len;
+ ssize_t res_len;
+ uint32_t buf_start, buf_len;
nxt_napi napi(env);
napi_value this_arg;
nxt_unit_buf_t *buf;
napi_valuetype buf_type;
nxt_unit_request_info_t *req;
- napi_value argv[2];
+ napi_value argv[3];
- argc = 2;
+ argc = 3;
try {
this_arg = napi.get_cb_info(info, argc, argv);
- if (argc != 2) {
+ if (argc != 3) {
throw exception("Wrong args count. Expected: "
- "chunk, chunk length");
+ "chunk, start, length");
}
req = napi.get_request_info(this_arg);
buf_type = napi.type_of(argv[0]);
- buf_len = napi.get_value_uint32(argv[1]) + 1;
-
- buf = nxt_unit_response_buf_alloc(req, buf_len);
- if (buf == NULL) {
- throw exception("Failed to allocate response buffer");
- }
+ buf_start = napi.get_value_uint32(argv[1]);
+ buf_len = napi.get_value_uint32(argv[2]) + 1;
if (buf_type == napi_string) {
/* TODO: will work only for utf8 content-type */
+ if (req->response_buf != NULL
+ && (req->response_buf->end - req->response_buf->free)
+ >= buf_len)
+ {
+ buf = req->response_buf;
+
+ } else {
+ buf = nxt_unit_response_buf_alloc(req, buf_len);
+ if (buf == NULL) {
+ throw exception("Failed to allocate response buffer");
+ }
+ }
+
have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
buf_len);
+ buf->free += have_buf_len;
+
+ ret = nxt_unit_buf_send(buf);
+ if (ret == NXT_UNIT_OK) {
+ res_len = have_buf_len;
+ }
+
} else {
ptr = napi.get_buffer_info(argv[0], have_buf_len);
- memcpy(buf->free, ptr, have_buf_len);
- }
+ if (buf_start > 0) {
+ ptr = ((uint8_t *) ptr) + buf_start;
+ have_buf_len -= buf_start;
+ }
- buf->free += have_buf_len;
+ res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
+
+ ret = res_len < 0 ? -res_len : NXT_UNIT_OK;
+ }
- ret = nxt_unit_buf_send(buf);
if (ret != NXT_UNIT_OK) {
throw exception("Failed to send body buf");
}
@@ -797,7 +855,7 @@ Unit::response_write(napi_env env, napi_callback_info info)
return nullptr;
}
- return this_arg;
+ return napi.create((int64_t) res_len);
}
diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h
index f5eaf9fd..18359118 100644
--- a/src/nodejs/unit-http/unit.h
+++ b/src/nodejs/unit-http/unit.h
@@ -36,6 +36,9 @@ private:
static void close_handler_cb(nxt_unit_request_info_t *req);
void close_handler(nxt_unit_request_info_t *req);
+ static void shm_ack_handler_cb(nxt_unit_ctx_t *ctx);
+ void shm_ack_handler(nxt_unit_ctx_t *ctx);
+
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);