summaryrefslogtreecommitdiffhomepage
path: root/src/nodejs
diff options
context:
space:
mode:
authorKonstantin Pavlov <thresh@nginx.com>2018-11-15 16:23:35 +0300
committerKonstantin Pavlov <thresh@nginx.com>2018-11-15 16:23:35 +0300
commit6ccba253f8d415337a09fb935606447791ce308c (patch)
treee0f9a8c5e8ede8cef1500c316d7534dd8de7b972 /src/nodejs
parentbdde42999b36af85f2f04c0872fdd3e30af52027 (diff)
parenta4b02e17382ccbfc19410c644004c4615b2c2c29 (diff)
downloadunit-6ccba253f8d415337a09fb935606447791ce308c.tar.gz
unit-6ccba253f8d415337a09fb935606447791ce308c.tar.bz2
Merged with the default branch.1.6-1
Diffstat (limited to '')
-rw-r--r--src/nodejs/unit-http/README.md20
-rwxr-xr-xsrc/nodejs/unit-http/http_server.js35
-rwxr-xr-xsrc/nodejs/unit-http/socket.js38
-rw-r--r--src/nodejs/unit-http/unit.cpp291
-rw-r--r--src/nodejs/unit-http/unit.h9
5 files changed, 262 insertions, 131 deletions
diff --git a/src/nodejs/unit-http/README.md b/src/nodejs/unit-http/README.md
index 71a4067a..b6b975e4 100644
--- a/src/nodejs/unit-http/README.md
+++ b/src/nodejs/unit-http/README.md
@@ -1,21 +1,5 @@
+[<img src="https://unit.nginx.org/_static/logo.svg" width="40%">](https://unit.nginx.org)
+
# Node.js Package for NGINX Unit
-[<img src="https://unit.nginx.org/_static/logo.svg" width=150px>](https://unit.nginx.org)
-Node.js support package for NGINX Unit.
For details, see [NGINX Unit documentation](https://unit.nginx.org).
-
-## Installation
-
-```bash
-npm i unit-http
-```
-
-## Usage
-
-```javascript
-var http = require('unit-http');
-```
-
-## License
-
-Apache 2.0
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js
index fa7b8e9b..57163c0b 100755
--- a/src/nodejs/unit-http/http_server.js
+++ b/src/nodejs/unit-http/http_server.js
@@ -78,7 +78,7 @@ ServerResponse.prototype.setHeader = function setHeader(key, value) {
this.removeHeader(key);
- this.headers[key] = value + "";
+ this.headers[key] = value;
this.headers_len += header_len + (header_key_len * header_count);
this.headers_count += header_count;
};
@@ -227,12 +227,11 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
ServerResponse.prototype.write = function write(chunk, encoding, callback) {
this._writeBody(chunk, encoding, callback);
- return this;
+ return true;
};
ServerResponse.prototype.end = function end(chunk, encoding, callback) {
this._writeBody(chunk, encoding, callback);
- unit_lib.unit_response_end(this)
this.finished = true;
@@ -290,10 +289,10 @@ function Server(requestListener) {
EventEmitter.call(this);
this.unit = new unit_lib.Unit();
- this.unit.createServer();
-
this.unit.server = this;
+ this.unit.createServer();
+
this.socket = Socket;
this.request = ServerRequest;
this.response = ServerResponse;
@@ -318,10 +317,34 @@ Server.prototype.listen = function () {
this.unit.listen();
};
+Server.prototype.run_events = function (server, req, res) {
+ /* Important!!! setImmediate starts the next iteration in Node.js loop. */
+ setImmediate(function () {
+ server.emit("request", req, res);
+
+ Promise.resolve().then(() => {
+ let buf = server.unit._read(req.socket.req_pointer);
+
+ if (buf.length != 0) {
+ req.emit("data", buf);
+ }
+
+ req.emit("end");
+ });
+
+ Promise.resolve().then(() => {
+ req.emit("finish");
+
+ if (res.finished) {
+ unit_lib.unit_response_end(res);
+ }
+ });
+ });
+};
+
function connectionListener(socket) {
}
-
module.exports = {
STATUS_CODES: http.STATUS_CODES,
Server,
diff --git a/src/nodejs/unit-http/socket.js b/src/nodejs/unit-http/socket.js
index 89702834..aef065bf 100755
--- a/src/nodejs/unit-http/socket.js
+++ b/src/nodejs/unit-http/socket.js
@@ -12,15 +12,16 @@ const unit_lib = require('unit-http/build/Release/unit-http.node');
function Socket(options) {
EventEmitter.call(this);
- if (typeof options === 'number') {
- options = { fd: options };
+ options = options || {};
- } else if (options === undefined) {
- options = {};
+ if (typeof options !== 'object') {
+ throw new TypeError('Options must be object');
}
- this.readable = options.readable !== false;
- this.writable = options.writable !== false;
+ this.readable = (typeof options.readable === 'boolean' ? options.readable
+ : false);
+ this.writable = (typeof options.writable === 'boolean' ? options.writable
+ : false);
}
util.inherits(Socket, EventEmitter);
@@ -38,25 +39,24 @@ Socket.prototype.remotePort = 0;
Socket.prototype.address = function address() {
};
-Socket.prototype.connect = function connect(options, callback) {
- if (callback !== null) {
- this.once('connect', cb);
- }
+Socket.prototype.connect = function connect(options, connectListener) {
+ this.once('connect', connectListener);
this.connecting = true;
this.writable = true;
-};
-Socket.prototype.address = function address() {
+ return this;
};
Socket.prototype.destroy = function destroy(exception) {
this.connecting = false;
this.readable = false;
this.writable = false;
+
+ return this;
};
-Socket.prototype.end = function end(data, encoding) {
+Socket.prototype.end = function end(data, encoding, callback) {
};
Socket.prototype.pause = function pause() {
@@ -77,13 +77,15 @@ Socket.prototype.setKeepAlive = function setKeepAlive(enable, initialDelay) {
Socket.prototype.setNoDelay = function setNoDelay(noDelay) {
};
-Socket.prototype.setTimeout = function setTimeout(msecs, callback) {
- this.timeout = msecs;
-
- if (callback) {
- this.on('timeout', callback);
+Socket.prototype.setTimeout = function setTimeout(timeout, callback) {
+ if (typeof timeout !== 'number') {
+ throw new TypeError('Timeout must be number');
}
+ this.timeout = timeout;
+
+ this.on('timeout', callback);
+
return this;
};
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 40f641a6..be64a59b 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -5,10 +5,21 @@
#include "unit.h"
+#include <unistd.h>
+#include <fcntl.h>
+
+#include <uv.h>
+
napi_ref Unit::constructor_;
+struct nxt_nodejs_ctx_t {
+ nxt_unit_port_id_t port_id;
+ uv_poll_t poll;
+};
+
+
Unit::Unit(napi_env env):
env_(env),
wrapper_(nullptr),
@@ -31,11 +42,12 @@ Unit::init(napi_env env, napi_value exports)
napi_property_descriptor properties[] = {
{ "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
- { "listen", 0, listen, 0, 0, 0, napi_default, 0 }
+ { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
+ { "_read", 0, _read, 0, 0, 0, napi_default, 0 }
};
status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr,
- 2, properties, &cons);
+ 3, properties, &cons);
if (status != napi_ok) {
goto failed;
}
@@ -105,6 +117,7 @@ napi_value
Unit::create(napi_env env, napi_callback_info info)
{
Unit *obj;
+ napi_ref ref;
napi_value target, cons, instance, jsthis;
napi_status status;
@@ -129,6 +142,11 @@ Unit::create(napi_env env, napi_callback_info info)
goto failed;
}
+ status = napi_create_reference(env, jsthis, 1, &ref);
+ if (status != napi_ok) {
+ goto failed;
+ }
+
return jsthis;
}
@@ -143,6 +161,11 @@ Unit::create(napi_env env, napi_callback_info info)
goto failed;
}
+ status = napi_create_reference(env, instance, 1, &ref);
+ if (status != napi_ok) {
+ goto failed;
+ }
+
return instance;
failed:
@@ -158,14 +181,13 @@ Unit::create_server(napi_env env, napi_callback_info info)
{
Unit *obj;
size_t argc;
- napi_value jsthis;
+ napi_value jsthis, argv;
napi_status status;
- napi_value argv[1];
nxt_unit_init_t unit_init;
argc = 1;
- status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr);
+ status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr);
if (status != napi_ok) {
goto failed;
}
@@ -179,6 +201,9 @@ Unit::create_server(napi_env env, napi_callback_info info)
unit_init.data = obj;
unit_init.callbacks.request_handler = request_handler;
+ unit_init.callbacks.add_port = add_port;
+ unit_init.callbacks.remove_port = remove_port;
+ unit_init.callbacks.quit = quit;
obj->unit_ctx_ = nxt_unit_init(&unit_init);
if (obj->unit_ctx_ == NULL) {
@@ -198,41 +223,53 @@ failed:
napi_value
Unit::listen(napi_env env, napi_callback_info info)
{
- int ret;
- Unit *obj;
- napi_value jsthis;
- napi_status status;
+ return nullptr;
+}
+
- status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr);
+napi_value
+Unit::_read(napi_env env, napi_callback_info info)
+{
+ Unit *obj;
+ void *data;
+ size_t argc;
+ int64_t req_pointer;
+ napi_value jsthis, buffer, argv;
+ napi_status status;
+ nxt_unit_request_info_t *req;
+
+ argc = 1;
+
+ status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr);
if (status != napi_ok) {
- goto failed;
+ napi_throw_error(env, NULL, "Failed to get arguments from js");
+ return nullptr;
}
status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj));
if (status != napi_ok) {
- goto failed;
- }
-
- if (obj->unit_ctx_ == NULL) {
- napi_throw_error(env, NULL, "Unit context was not created");
+ napi_throw_error(env, NULL, "Failed to get Unit object form js");
return nullptr;
}
- ret = nxt_unit_run(obj->unit_ctx_);
- if (ret != NXT_UNIT_OK) {
- napi_throw_error(env, NULL, "Failed to run Unit");
+ status = napi_get_value_int64(env, argv, &req_pointer);
+ if (status != napi_ok) {
+ napi_throw_error(env, NULL, "Failed to get request pointer");
return nullptr;
}
- nxt_unit_done(obj->unit_ctx_);
-
- return nullptr;
+ req = (nxt_unit_request_info_t *) (uintptr_t) req_pointer;
-failed:
+ status = napi_create_buffer(env, (size_t) req->content_length,
+ &data, &buffer);
+ if (status != napi_ok) {
+ napi_throw_error(env, NULL, "Failed to create request buffer");
+ return nullptr;
+ }
- napi_throw_error(env, NULL, "Failed to listen Unit socket");
+ nxt_unit_request_read(req, data, req->content_length);
- return nullptr;
+ return buffer;
}
@@ -242,8 +279,9 @@ Unit::request_handler(nxt_unit_request_info_t *req)
Unit *obj;
napi_value socket, request, response;
napi_value global, server_obj;
- napi_value req_argv[3];
+ napi_value run_events, events_res;
napi_status status;
+ napi_value events_args[3];
obj = reinterpret_cast<Unit *>(req->unit->data);
@@ -284,73 +322,143 @@ Unit::request_handler(nxt_unit_request_info_t *req)
return;
}
- req_argv[1] = request;
- req_argv[2] = response;
-
status = obj->create_headers(req, request);
if (status != napi_ok) {
napi_throw_error(obj->env_, NULL, "Failed to create headers");
return;
}
- obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv);
- obj->emit_post_data(request, req);
+ status = napi_get_named_property(obj->env_, server_obj, "run_events",
+ &run_events);
+ if (status != napi_ok) {
+ napi_throw_error(obj->env_, NULL, "Failed to get"
+ " 'run_events' function");
+ return;
+ }
+
+ events_args[0] = server_obj;
+ events_args[1] = request;
+ events_args[2] = response;
+
+ status = napi_call_function(obj->env_, server_obj, run_events, 3,
+ events_args, &events_res);
+ if (status != napi_ok) {
+ napi_throw_error(obj->env_, NULL, "Failed to call"
+ " 'run_events' function");
+ return;
+ }
napi_close_handle_scope(obj->env_, scope);
}
-napi_value
-Unit::get_server_object()
+void
+nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{
- napi_value unit_obj, server_obj;
- napi_status status;
+ nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
+}
- status = napi_get_reference_value(env_, wrapper_, &unit_obj);
- if (status != napi_ok) {
- return nullptr;
+
+int
+Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int err;
+ Unit *obj;
+ uv_loop_t *loop;
+ napi_status status;
+ nxt_nodejs_ctx_t *node_ctx;
+
+ if (port->in_fd != -1) {
+ obj = reinterpret_cast<Unit *>(ctx->unit->data);
+
+ if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
+ napi_throw_error(obj->env_, NULL, "Failed to upgrade read"
+ " file descriptor to O_NONBLOCK");
+ return -1;
+ }
+
+ status = napi_get_uv_event_loop(obj->env_, &loop);
+ if (status != napi_ok) {
+ napi_throw_error(obj->env_, NULL, "Failed to get uv.loop");
+ return NXT_UNIT_ERROR;
+ }
+
+ node_ctx = new nxt_nodejs_ctx_t;
+
+ err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
+ if (err < 0) {
+ napi_throw_error(obj->env_, NULL, "Failed to init uv.poll");
+ return NXT_UNIT_ERROR;
+ }
+
+ err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
+ if (err < 0) {
+ napi_throw_error(obj->env_, NULL, "Failed to start uv.poll");
+ return NXT_UNIT_ERROR;
+ }
+
+ ctx->data = node_ctx;
+
+ node_ctx->port_id = port->id;
+ node_ctx->poll.data = ctx;
}
- status = napi_get_named_property(env_, unit_obj, "server", &server_obj);
- if (status != napi_ok) {
- return nullptr;
+ return nxt_unit_add_port(ctx, port);
+}
+
+
+inline bool
+operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
+{
+ return p1.pid == p2.pid && p1.id == p2.id;
+}
+
+
+void
+Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ nxt_nodejs_ctx_t *node_ctx;
+
+ if (ctx->data != NULL) {
+ node_ctx = (nxt_nodejs_ctx_t *) ctx->data;
+
+ if (node_ctx->port_id == *port_id) {
+ uv_poll_stop(&node_ctx->poll);
+
+ delete node_ctx;
+
+ ctx->data = NULL;
+ }
}
- return server_obj;
+ nxt_unit_remove_port(ctx, port_id);
+}
+
+
+void
+Unit::quit(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_done(ctx);
}
napi_value
-Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc,
- napi_value *argv)
+Unit::get_server_object()
{
- napi_value emitter, return_val, str;
+ napi_value unit_obj, server_obj;
napi_status status;
- status = napi_get_named_property(env_, obj, "emit", &emitter);
- if (status != napi_ok) {
- return nullptr;
- }
-
- status = napi_create_string_latin1(env_, name, name_len, &str);
+ status = napi_get_reference_value(env_, wrapper_, &unit_obj);
if (status != napi_ok) {
return nullptr;
}
- if (argc != 0) {
- argv[0] = str;
-
- } else {
- argc = 1;
- argv = &str;
- }
-
- status = napi_call_function(env_, obj, emitter, argc, argv, &return_val);
+ status = napi_get_named_property(env_, unit_obj, "server", &server_obj);
if (status != napi_ok) {
return nullptr;
}
- return return_val;
+ return server_obj;
}
@@ -391,7 +499,7 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
return status;
}
- status = napi_set_named_property(env_, request, "raw_headers", raw_headers);
+ status = napi_set_named_property(env_, request, "rawHeaders", raw_headers);
if (status != napi_ok) {
return status;
}
@@ -480,7 +588,7 @@ Unit::append_header(nxt_unit_field_t *f, napi_value headers,
napi_value
Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
{
- napi_value constructor, return_val;
+ napi_value constructor, return_val, req_pointer;
napi_status status;
status = napi_get_named_property(env_, server_obj, "socket",
@@ -494,6 +602,17 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
return nullptr;
}
+ status = napi_create_int64(env_, (uintptr_t) req, &req_pointer);
+ if (status != napi_ok) {
+ return nullptr;
+ }
+
+ status = napi_set_named_property(env_, return_val, "req_pointer",
+ req_pointer);
+ if (status != napi_ok) {
+ return nullptr;
+ }
+
return return_val;
}
@@ -563,27 +682,6 @@ Unit::create_response(napi_value server_obj, napi_value socket,
}
-void
-Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req)
-{
- void *data;
- napi_value req_argv[2];
- napi_status status;
-
- status = napi_create_buffer(env_, (size_t) req->content_length,
- &data, &req_argv[1]);
- if (status != napi_ok) {
- napi_throw_error(env_, NULL, "Failed to create request buffer");
- return;
- }
-
- nxt_unit_request_read(req, data, req->content_length);
-
- emit(request, "data", sizeof("data") - 1, 2, req_argv);
- emit(request, "end", sizeof("end") - 1, 0, nullptr);
-}
-
-
napi_value
Unit::response_send_headers(napi_env env, napi_callback_info info)
{
@@ -598,6 +696,7 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
napi_value this_arg, headers, keys, name, value, array_val;
napi_value req_num;
napi_status status;
+ napi_valuetype val_type;
nxt_unit_field_t *f;
nxt_unit_request_info_t *req;
napi_value argv[5];
@@ -707,6 +806,18 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
goto failed;
}
+ napi_typeof(env, array_val, &val_type);
+ if (status != napi_ok) {
+ goto failed;
+ }
+
+ if (val_type != napi_string) {
+ status = napi_coerce_to_string(env, array_val, &array_val);
+ if (status != napi_ok) {
+ goto failed;
+ }
+ }
+
status = napi_get_value_string_latin1(env, array_val, ptr,
header_len,
&value_len);
@@ -732,6 +843,18 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
}
} else {
+ napi_typeof(env, value, &val_type);
+ if (status != napi_ok) {
+ goto failed;
+ }
+
+ if (val_type != napi_string) {
+ status = napi_coerce_to_string(env, value, &value);
+ if (status != napi_ok) {
+ goto failed;
+ }
+ }
+
status = napi_get_value_string_latin1(env, value, ptr, header_len,
&value_len);
if (status != napi_ok) {
diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h
index 753a14d8..5f541cc4 100644
--- a/src/nodejs/unit-http/unit.h
+++ b/src/nodejs/unit-http/unit.h
@@ -36,13 +36,14 @@ private:
static napi_value create_server(napi_env env, napi_callback_info info);
static napi_value listen(napi_env env, napi_callback_info info);
+ static napi_value _read(napi_env env, napi_callback_info info);
static void request_handler(nxt_unit_request_info_t *req);
+ static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
+ static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
+ static void quit(nxt_unit_ctx_t *ctx);
napi_value get_server_object();
- napi_value emit(napi_value obj, const char *name, size_t name_len,
- size_t argc, napi_value *argv);
-
napi_value create_socket(napi_value server_obj,
nxt_unit_request_info_t *req);
@@ -52,8 +53,6 @@ private:
napi_value request,
nxt_unit_request_info_t *req, Unit *obj);
- void emit_post_data(napi_value request, nxt_unit_request_info_t *req);
-
static napi_value response_send_headers(napi_env env,
napi_callback_info info);