diff options
author | Konstantin Pavlov <thresh@nginx.com> | 2018-11-15 16:23:35 +0300 |
---|---|---|
committer | Konstantin Pavlov <thresh@nginx.com> | 2018-11-15 16:23:35 +0300 |
commit | 6ccba253f8d415337a09fb935606447791ce308c (patch) | |
tree | e0f9a8c5e8ede8cef1500c316d7534dd8de7b972 /src/nodejs/unit-http | |
parent | bdde42999b36af85f2f04c0872fdd3e30af52027 (diff) | |
parent | a4b02e17382ccbfc19410c644004c4615b2c2c29 (diff) | |
download | unit-6ccba253f8d415337a09fb935606447791ce308c.tar.gz unit-6ccba253f8d415337a09fb935606447791ce308c.tar.bz2 |
Merged with the default branch.1.6-1
Diffstat (limited to '')
-rw-r--r-- | src/nodejs/unit-http/README.md | 20 | ||||
-rwxr-xr-x | src/nodejs/unit-http/http_server.js | 35 | ||||
-rwxr-xr-x | src/nodejs/unit-http/socket.js | 38 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 291 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.h | 9 |
5 files changed, 262 insertions, 131 deletions
diff --git a/src/nodejs/unit-http/README.md b/src/nodejs/unit-http/README.md index 71a4067a..b6b975e4 100644 --- a/src/nodejs/unit-http/README.md +++ b/src/nodejs/unit-http/README.md @@ -1,21 +1,5 @@ +[<img src="https://unit.nginx.org/_static/logo.svg" width="40%">](https://unit.nginx.org) + # Node.js Package for NGINX Unit -[<img src="https://unit.nginx.org/_static/logo.svg" width=150px>](https://unit.nginx.org) -Node.js support package for NGINX Unit. For details, see [NGINX Unit documentation](https://unit.nginx.org). - -## Installation - -```bash -npm i unit-http -``` - -## Usage - -```javascript -var http = require('unit-http'); -``` - -## License - -Apache 2.0 diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index fa7b8e9b..57163c0b 100755 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -78,7 +78,7 @@ ServerResponse.prototype.setHeader = function setHeader(key, value) { this.removeHeader(key); - this.headers[key] = value + ""; + this.headers[key] = value; this.headers_len += header_len + (header_key_len * header_count); this.headers_count += header_count; }; @@ -227,12 +227,11 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { ServerResponse.prototype.write = function write(chunk, encoding, callback) { this._writeBody(chunk, encoding, callback); - return this; + return true; }; ServerResponse.prototype.end = function end(chunk, encoding, callback) { this._writeBody(chunk, encoding, callback); - unit_lib.unit_response_end(this) this.finished = true; @@ -290,10 +289,10 @@ function Server(requestListener) { EventEmitter.call(this); this.unit = new unit_lib.Unit(); - this.unit.createServer(); - this.unit.server = this; + this.unit.createServer(); + this.socket = Socket; this.request = ServerRequest; this.response = ServerResponse; @@ -318,10 +317,34 @@ Server.prototype.listen = function () { this.unit.listen(); }; +Server.prototype.run_events = function (server, req, res) { + /* Important!!! setImmediate starts the next iteration in Node.js loop. */ + setImmediate(function () { + server.emit("request", req, res); + + Promise.resolve().then(() => { + let buf = server.unit._read(req.socket.req_pointer); + + if (buf.length != 0) { + req.emit("data", buf); + } + + req.emit("end"); + }); + + Promise.resolve().then(() => { + req.emit("finish"); + + if (res.finished) { + unit_lib.unit_response_end(res); + } + }); + }); +}; + function connectionListener(socket) { } - module.exports = { STATUS_CODES: http.STATUS_CODES, Server, diff --git a/src/nodejs/unit-http/socket.js b/src/nodejs/unit-http/socket.js index 89702834..aef065bf 100755 --- a/src/nodejs/unit-http/socket.js +++ b/src/nodejs/unit-http/socket.js @@ -12,15 +12,16 @@ const unit_lib = require('unit-http/build/Release/unit-http.node'); function Socket(options) { EventEmitter.call(this); - if (typeof options === 'number') { - options = { fd: options }; + options = options || {}; - } else if (options === undefined) { - options = {}; + if (typeof options !== 'object') { + throw new TypeError('Options must be object'); } - this.readable = options.readable !== false; - this.writable = options.writable !== false; + this.readable = (typeof options.readable === 'boolean' ? options.readable + : false); + this.writable = (typeof options.writable === 'boolean' ? options.writable + : false); } util.inherits(Socket, EventEmitter); @@ -38,25 +39,24 @@ Socket.prototype.remotePort = 0; Socket.prototype.address = function address() { }; -Socket.prototype.connect = function connect(options, callback) { - if (callback !== null) { - this.once('connect', cb); - } +Socket.prototype.connect = function connect(options, connectListener) { + this.once('connect', connectListener); this.connecting = true; this.writable = true; -}; -Socket.prototype.address = function address() { + return this; }; Socket.prototype.destroy = function destroy(exception) { this.connecting = false; this.readable = false; this.writable = false; + + return this; }; -Socket.prototype.end = function end(data, encoding) { +Socket.prototype.end = function end(data, encoding, callback) { }; Socket.prototype.pause = function pause() { @@ -77,13 +77,15 @@ Socket.prototype.setKeepAlive = function setKeepAlive(enable, initialDelay) { Socket.prototype.setNoDelay = function setNoDelay(noDelay) { }; -Socket.prototype.setTimeout = function setTimeout(msecs, callback) { - this.timeout = msecs; - - if (callback) { - this.on('timeout', callback); +Socket.prototype.setTimeout = function setTimeout(timeout, callback) { + if (typeof timeout !== 'number') { + throw new TypeError('Timeout must be number'); } + this.timeout = timeout; + + this.on('timeout', callback); + return this; }; diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 40f641a6..be64a59b 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -5,10 +5,21 @@ #include "unit.h" +#include <unistd.h> +#include <fcntl.h> + +#include <uv.h> + napi_ref Unit::constructor_; +struct nxt_nodejs_ctx_t { + nxt_unit_port_id_t port_id; + uv_poll_t poll; +}; + + Unit::Unit(napi_env env): env_(env), wrapper_(nullptr), @@ -31,11 +42,12 @@ Unit::init(napi_env env, napi_value exports) napi_property_descriptor properties[] = { { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, - { "listen", 0, listen, 0, 0, 0, napi_default, 0 } + { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, + { "_read", 0, _read, 0, 0, 0, napi_default, 0 } }; status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr, - 2, properties, &cons); + 3, properties, &cons); if (status != napi_ok) { goto failed; } @@ -105,6 +117,7 @@ napi_value Unit::create(napi_env env, napi_callback_info info) { Unit *obj; + napi_ref ref; napi_value target, cons, instance, jsthis; napi_status status; @@ -129,6 +142,11 @@ Unit::create(napi_env env, napi_callback_info info) goto failed; } + status = napi_create_reference(env, jsthis, 1, &ref); + if (status != napi_ok) { + goto failed; + } + return jsthis; } @@ -143,6 +161,11 @@ Unit::create(napi_env env, napi_callback_info info) goto failed; } + status = napi_create_reference(env, instance, 1, &ref); + if (status != napi_ok) { + goto failed; + } + return instance; failed: @@ -158,14 +181,13 @@ Unit::create_server(napi_env env, napi_callback_info info) { Unit *obj; size_t argc; - napi_value jsthis; + napi_value jsthis, argv; napi_status status; - napi_value argv[1]; nxt_unit_init_t unit_init; argc = 1; - status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr); + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { goto failed; } @@ -179,6 +201,9 @@ Unit::create_server(napi_env env, napi_callback_info info) unit_init.data = obj; unit_init.callbacks.request_handler = request_handler; + unit_init.callbacks.add_port = add_port; + unit_init.callbacks.remove_port = remove_port; + unit_init.callbacks.quit = quit; obj->unit_ctx_ = nxt_unit_init(&unit_init); if (obj->unit_ctx_ == NULL) { @@ -198,41 +223,53 @@ failed: napi_value Unit::listen(napi_env env, napi_callback_info info) { - int ret; - Unit *obj; - napi_value jsthis; - napi_status status; + return nullptr; +} + - status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr); +napi_value +Unit::_read(napi_env env, napi_callback_info info) +{ + Unit *obj; + void *data; + size_t argc; + int64_t req_pointer; + napi_value jsthis, buffer, argv; + napi_status status; + nxt_unit_request_info_t *req; + + argc = 1; + + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { - goto failed; + napi_throw_error(env, NULL, "Failed to get arguments from js"); + return nullptr; } status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj)); if (status != napi_ok) { - goto failed; - } - - if (obj->unit_ctx_ == NULL) { - napi_throw_error(env, NULL, "Unit context was not created"); + napi_throw_error(env, NULL, "Failed to get Unit object form js"); return nullptr; } - ret = nxt_unit_run(obj->unit_ctx_); - if (ret != NXT_UNIT_OK) { - napi_throw_error(env, NULL, "Failed to run Unit"); + status = napi_get_value_int64(env, argv, &req_pointer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); return nullptr; } - nxt_unit_done(obj->unit_ctx_); - - return nullptr; + req = (nxt_unit_request_info_t *) (uintptr_t) req_pointer; -failed: + status = napi_create_buffer(env, (size_t) req->content_length, + &data, &buffer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to create request buffer"); + return nullptr; + } - napi_throw_error(env, NULL, "Failed to listen Unit socket"); + nxt_unit_request_read(req, data, req->content_length); - return nullptr; + return buffer; } @@ -242,8 +279,9 @@ Unit::request_handler(nxt_unit_request_info_t *req) Unit *obj; napi_value socket, request, response; napi_value global, server_obj; - napi_value req_argv[3]; + napi_value run_events, events_res; napi_status status; + napi_value events_args[3]; obj = reinterpret_cast<Unit *>(req->unit->data); @@ -284,73 +322,143 @@ Unit::request_handler(nxt_unit_request_info_t *req) return; } - req_argv[1] = request; - req_argv[2] = response; - status = obj->create_headers(req, request); if (status != napi_ok) { napi_throw_error(obj->env_, NULL, "Failed to create headers"); return; } - obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv); - obj->emit_post_data(request, req); + status = napi_get_named_property(obj->env_, server_obj, "run_events", + &run_events); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get" + " 'run_events' function"); + return; + } + + events_args[0] = server_obj; + events_args[1] = request; + events_args[2] = response; + + status = napi_call_function(obj->env_, server_obj, run_events, 3, + events_args, &events_res); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to call" + " 'run_events' function"); + return; + } napi_close_handle_scope(obj->env_, scope); } -napi_value -Unit::get_server_object() +void +nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { - napi_value unit_obj, server_obj; - napi_status status; + nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); +} - status = napi_get_reference_value(env_, wrapper_, &unit_obj); - if (status != napi_ok) { - return nullptr; + +int +Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int err; + Unit *obj; + uv_loop_t *loop; + napi_status status; + nxt_nodejs_ctx_t *node_ctx; + + if (port->in_fd != -1) { + obj = reinterpret_cast<Unit *>(ctx->unit->data); + + if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { + napi_throw_error(obj->env_, NULL, "Failed to upgrade read" + " file descriptor to O_NONBLOCK"); + return -1; + } + + status = napi_get_uv_event_loop(obj->env_, &loop); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get uv.loop"); + return NXT_UNIT_ERROR; + } + + node_ctx = new nxt_nodejs_ctx_t; + + err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to init uv.poll"); + return NXT_UNIT_ERROR; + } + + err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to start uv.poll"); + return NXT_UNIT_ERROR; + } + + ctx->data = node_ctx; + + node_ctx->port_id = port->id; + node_ctx->poll.data = ctx; } - status = napi_get_named_property(env_, unit_obj, "server", &server_obj); - if (status != napi_ok) { - return nullptr; + return nxt_unit_add_port(ctx, port); +} + + +inline bool +operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) +{ + return p1.pid == p2.pid && p1.id == p2.id; +} + + +void +Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + nxt_nodejs_ctx_t *node_ctx; + + if (ctx->data != NULL) { + node_ctx = (nxt_nodejs_ctx_t *) ctx->data; + + if (node_ctx->port_id == *port_id) { + uv_poll_stop(&node_ctx->poll); + + delete node_ctx; + + ctx->data = NULL; + } } - return server_obj; + nxt_unit_remove_port(ctx, port_id); +} + + +void +Unit::quit(nxt_unit_ctx_t *ctx) +{ + nxt_unit_done(ctx); } napi_value -Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc, - napi_value *argv) +Unit::get_server_object() { - napi_value emitter, return_val, str; + napi_value unit_obj, server_obj; napi_status status; - status = napi_get_named_property(env_, obj, "emit", &emitter); - if (status != napi_ok) { - return nullptr; - } - - status = napi_create_string_latin1(env_, name, name_len, &str); + status = napi_get_reference_value(env_, wrapper_, &unit_obj); if (status != napi_ok) { return nullptr; } - if (argc != 0) { - argv[0] = str; - - } else { - argc = 1; - argv = &str; - } - - status = napi_call_function(env_, obj, emitter, argc, argv, &return_val); + status = napi_get_named_property(env_, unit_obj, "server", &server_obj); if (status != napi_ok) { return nullptr; } - return return_val; + return server_obj; } @@ -391,7 +499,7 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) return status; } - status = napi_set_named_property(env_, request, "raw_headers", raw_headers); + status = napi_set_named_property(env_, request, "rawHeaders", raw_headers); if (status != napi_ok) { return status; } @@ -480,7 +588,7 @@ Unit::append_header(nxt_unit_field_t *f, napi_value headers, napi_value Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) { - napi_value constructor, return_val; + napi_value constructor, return_val, req_pointer; napi_status status; status = napi_get_named_property(env_, server_obj, "socket", @@ -494,6 +602,17 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) return nullptr; } + status = napi_create_int64(env_, (uintptr_t) req, &req_pointer); + if (status != napi_ok) { + return nullptr; + } + + status = napi_set_named_property(env_, return_val, "req_pointer", + req_pointer); + if (status != napi_ok) { + return nullptr; + } + return return_val; } @@ -563,27 +682,6 @@ Unit::create_response(napi_value server_obj, napi_value socket, } -void -Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req) -{ - void *data; - napi_value req_argv[2]; - napi_status status; - - status = napi_create_buffer(env_, (size_t) req->content_length, - &data, &req_argv[1]); - if (status != napi_ok) { - napi_throw_error(env_, NULL, "Failed to create request buffer"); - return; - } - - nxt_unit_request_read(req, data, req->content_length); - - emit(request, "data", sizeof("data") - 1, 2, req_argv); - emit(request, "end", sizeof("end") - 1, 0, nullptr); -} - - napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { @@ -598,6 +696,7 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) napi_value this_arg, headers, keys, name, value, array_val; napi_value req_num; napi_status status; + napi_valuetype val_type; nxt_unit_field_t *f; nxt_unit_request_info_t *req; napi_value argv[5]; @@ -707,6 +806,18 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) goto failed; } + napi_typeof(env, array_val, &val_type); + if (status != napi_ok) { + goto failed; + } + + if (val_type != napi_string) { + status = napi_coerce_to_string(env, array_val, &array_val); + if (status != napi_ok) { + goto failed; + } + } + status = napi_get_value_string_latin1(env, array_val, ptr, header_len, &value_len); @@ -732,6 +843,18 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) } } else { + napi_typeof(env, value, &val_type); + if (status != napi_ok) { + goto failed; + } + + if (val_type != napi_string) { + status = napi_coerce_to_string(env, value, &value); + if (status != napi_ok) { + goto failed; + } + } + status = napi_get_value_string_latin1(env, value, ptr, header_len, &value_len); if (status != napi_ok) { diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 753a14d8..5f541cc4 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -36,13 +36,14 @@ private: static napi_value create_server(napi_env env, napi_callback_info info); static napi_value listen(napi_env env, napi_callback_info info); + static napi_value _read(napi_env env, napi_callback_info info); static void request_handler(nxt_unit_request_info_t *req); + static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); + static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); + static void quit(nxt_unit_ctx_t *ctx); napi_value get_server_object(); - napi_value emit(napi_value obj, const char *name, size_t name_len, - size_t argc, napi_value *argv); - napi_value create_socket(napi_value server_obj, nxt_unit_request_info_t *req); @@ -52,8 +53,6 @@ private: napi_value request, nxt_unit_request_info_t *req, Unit *obj); - void emit_post_data(napi_value request, nxt_unit_request_info_t *req); - static napi_value response_send_headers(napi_env env, napi_callback_info info); |