diff options
author | Alexander Borisov <alexander.borisov@nginx.com> | 2018-10-31 15:51:51 +0300 |
---|---|---|
committer | Alexander Borisov <alexander.borisov@nginx.com> | 2018-10-31 15:51:51 +0300 |
commit | c838c3bd1580735e8020687f94e6307f13aba156 (patch) | |
tree | d88247c68c70ec4770d76037bd5fefcb836754ac /src | |
parent | 3b0afb16814353a5d34a7384f4e84e9c17f3fb8e (diff) | |
download | unit-c838c3bd1580735e8020687f94e6307f13aba156.tar.gz unit-c838c3bd1580735e8020687f94e6307f13aba156.tar.bz2 |
Node.js: added async request execution.
Diffstat (limited to 'src')
-rwxr-xr-x | src/nodejs/unit-http/http_server.js | 29 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 227 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.h | 8 | ||||
-rw-r--r-- | src/nxt_unit.c | 3 | ||||
-rw-r--r-- | src/nxt_unit.h | 2 |
5 files changed, 176 insertions, 93 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index fa7b8e9b..ddacb420 100755 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -232,7 +232,6 @@ ServerResponse.prototype.write = function write(chunk, encoding, callback) { ServerResponse.prototype.end = function end(chunk, encoding, callback) { this._writeBody(chunk, encoding, callback); - unit_lib.unit_response_end(this) this.finished = true; @@ -290,10 +289,10 @@ function Server(requestListener) { EventEmitter.call(this); this.unit = new unit_lib.Unit(); - this.unit.createServer(); - this.unit.server = this; + this.unit.createServer(); + this.socket = Socket; this.request = ServerRequest; this.response = ServerResponse; @@ -318,10 +317,32 @@ Server.prototype.listen = function () { this.unit.listen(); }; +Server.prototype.run_events = function (server, req, res) { + /* Important!!! setImmediate starts the next iteration in Node.js loop. */ + setImmediate(function () { + server.emit("request", req, res); + + Promise.resolve().then(() => { + let buf = server.unit._read(req.socket.req_pointer); + + if (buf.length != 0) { + req.emit("data", buf); + } + + req.emit("end"); + }); + + Promise.resolve().then(() => { + if (res.finished) { + unit_lib.unit_response_end(res); + } + }); + }); +}; + function connectionListener(socket) { } - module.exports = { STATUS_CODES: http.STATUS_CODES, Server, diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 40f641a6..5d7f15c0 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -5,6 +5,11 @@ #include "unit.h" +#include <unistd.h> +#include <fcntl.h> + +#include <uv.h> + napi_ref Unit::constructor_; @@ -31,11 +36,12 @@ Unit::init(napi_env env, napi_value exports) napi_property_descriptor properties[] = { { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, - { "listen", 0, listen, 0, 0, 0, napi_default, 0 } + { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, + { "_read", 0, _read, 0, 0, 0, napi_default, 0 } }; status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr, - 2, properties, &cons); + 3, properties, &cons); if (status != napi_ok) { goto failed; } @@ -158,14 +164,13 @@ Unit::create_server(napi_env env, napi_callback_info info) { Unit *obj; size_t argc; - napi_value jsthis; + napi_value jsthis, argv; napi_status status; - napi_value argv[1]; nxt_unit_init_t unit_init; argc = 1; - status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr); + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { goto failed; } @@ -179,6 +184,8 @@ Unit::create_server(napi_env env, napi_callback_info info) unit_init.data = obj; unit_init.callbacks.request_handler = request_handler; + unit_init.callbacks.add_port = add_port; + unit_init.callbacks.remove_port = remove_port; obj->unit_ctx_ = nxt_unit_init(&unit_init); if (obj->unit_ctx_ == NULL) { @@ -198,41 +205,53 @@ failed: napi_value Unit::listen(napi_env env, napi_callback_info info) { - int ret; - Unit *obj; - napi_value jsthis; - napi_status status; + return nullptr; +} + + +napi_value +Unit::_read(napi_env env, napi_callback_info info) +{ + Unit *obj; + void *data; + size_t argc; + int64_t req_pointer; + napi_value jsthis, buffer, argv; + napi_status status; + nxt_unit_request_info_t *req; + + argc = 1; - status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr); + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { - goto failed; + napi_throw_error(env, NULL, "Failed to get arguments from js"); + return nullptr; } status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj)); if (status != napi_ok) { - goto failed; - } - - if (obj->unit_ctx_ == NULL) { - napi_throw_error(env, NULL, "Unit context was not created"); + napi_throw_error(env, NULL, "Failed to get Unit object form js"); return nullptr; } - ret = nxt_unit_run(obj->unit_ctx_); - if (ret != NXT_UNIT_OK) { - napi_throw_error(env, NULL, "Failed to run Unit"); + status = napi_get_value_int64(env, argv, &req_pointer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); return nullptr; } - nxt_unit_done(obj->unit_ctx_); - - return nullptr; + req = (nxt_unit_request_info_t *) (uintptr_t) req_pointer; -failed: + status = napi_create_buffer(env, (size_t) req->content_length, + &data, &buffer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to create request buffer"); + return nullptr; + } - napi_throw_error(env, NULL, "Failed to listen Unit socket"); + nxt_unit_request_read(req, data, req->content_length); - return nullptr; + return buffer; } @@ -242,8 +261,9 @@ Unit::request_handler(nxt_unit_request_info_t *req) Unit *obj; napi_value socket, request, response; napi_value global, server_obj; - napi_value req_argv[3]; + napi_value run_events, events_res; napi_status status; + napi_value events_args[3]; obj = reinterpret_cast<Unit *>(req->unit->data); @@ -284,73 +304,126 @@ Unit::request_handler(nxt_unit_request_info_t *req) return; } - req_argv[1] = request; - req_argv[2] = response; - status = obj->create_headers(req, request); if (status != napi_ok) { napi_throw_error(obj->env_, NULL, "Failed to create headers"); return; } - obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv); - obj->emit_post_data(request, req); + status = napi_get_named_property(obj->env_, server_obj, "run_events", + &run_events); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get" + " 'run_events' function"); + return; + } + + events_args[0] = server_obj; + events_args[1] = request; + events_args[2] = response; + + status = napi_call_function(obj->env_, server_obj, run_events, 3, + events_args, &events_res); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to call" + " 'run_events' function"); + return; + } napi_close_handle_scope(obj->env_, scope); } -napi_value -Unit::get_server_object() +void +nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { - napi_value unit_obj, server_obj; + nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); +} + + +int +Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int err; + Unit *obj; + uv_loop_t *loop; + uv_poll_t *uv_handle; napi_status status; - status = napi_get_reference_value(env_, wrapper_, &unit_obj); - if (status != napi_ok) { - return nullptr; - } + if (port->in_fd != -1) { + obj = reinterpret_cast<Unit *>(ctx->unit->data); - status = napi_get_named_property(env_, unit_obj, "server", &server_obj); - if (status != napi_ok) { - return nullptr; + if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { + napi_throw_error(obj->env_, NULL, "Failed to upgrade read" + " file descriptor to O_NONBLOCK"); + return -1; + } + + status = napi_get_uv_event_loop(obj->env_, &loop); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get uv.loop"); + return NXT_UNIT_ERROR; + } + + uv_handle = new uv_poll_t; + + err = uv_poll_init(loop, uv_handle, port->in_fd); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to init uv.poll"); + return NXT_UNIT_ERROR; + } + + err = uv_poll_start(uv_handle, UV_READABLE, nxt_uv_read_callback); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to start uv.poll"); + return NXT_UNIT_ERROR; + } + + port->data = uv_handle; + uv_handle->data = ctx; } - return server_obj; + return nxt_unit_add_port(ctx, port); } -napi_value -Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc, - napi_value *argv) +void +Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) { - napi_value emitter, return_val, str; - napi_status status; + nxt_unit_port_t *port; - status = napi_get_named_property(env_, obj, "emit", &emitter); - if (status != napi_ok) { - return nullptr; + port = nxt_unit_find_port(ctx, port_id); + if (port == NULL) { + return; } - status = napi_create_string_latin1(env_, name, name_len, &str); - if (status != napi_ok) { - return nullptr; + if (port->in_fd != -1 && port->data != NULL) { + uv_poll_stop((uv_poll_t *) port->data); + + delete (uv_poll_t *) port->data; } - if (argc != 0) { - argv[0] = str; + nxt_unit_remove_port(ctx, port_id); +} - } else { - argc = 1; - argv = &str; + +napi_value +Unit::get_server_object() +{ + napi_value unit_obj, server_obj; + napi_status status; + + status = napi_get_reference_value(env_, wrapper_, &unit_obj); + if (status != napi_ok) { + return nullptr; } - status = napi_call_function(env_, obj, emitter, argc, argv, &return_val); + status = napi_get_named_property(env_, unit_obj, "server", &server_obj); if (status != napi_ok) { return nullptr; } - return return_val; + return server_obj; } @@ -480,7 +553,7 @@ Unit::append_header(nxt_unit_field_t *f, napi_value headers, napi_value Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) { - napi_value constructor, return_val; + napi_value constructor, return_val, req_pointer; napi_status status; status = napi_get_named_property(env_, server_obj, "socket", @@ -494,6 +567,17 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) return nullptr; } + status = napi_create_int64(env_, (uintptr_t) req, &req_pointer); + if (status != napi_ok) { + return nullptr; + } + + status = napi_set_named_property(env_, return_val, "req_pointer", + req_pointer); + if (status != napi_ok) { + return nullptr; + } + return return_val; } @@ -563,27 +647,6 @@ Unit::create_response(napi_value server_obj, napi_value socket, } -void -Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req) -{ - void *data; - napi_value req_argv[2]; - napi_status status; - - status = napi_create_buffer(env_, (size_t) req->content_length, - &data, &req_argv[1]); - if (status != napi_ok) { - napi_throw_error(env_, NULL, "Failed to create request buffer"); - return; - } - - nxt_unit_request_read(req, data, req->content_length); - - emit(request, "data", sizeof("data") - 1, 2, req_argv); - emit(request, "end", sizeof("end") - 1, 0, nullptr); -} - - napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 753a14d8..90c67efc 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -36,13 +36,13 @@ private: static napi_value create_server(napi_env env, napi_callback_info info); static napi_value listen(napi_env env, napi_callback_info info); + static napi_value _read(napi_env env, napi_callback_info info); static void request_handler(nxt_unit_request_info_t *req); + static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); + static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); napi_value get_server_object(); - napi_value emit(napi_value obj, const char *name, size_t name_len, - size_t argc, napi_value *argv); - napi_value create_socket(napi_value server_obj, nxt_unit_request_info_t *req); @@ -52,8 +52,6 @@ private: napi_value request, nxt_unit_request_info_t *req, Unit *obj); - void emit_post_data(napi_value request, nxt_unit_request_info_t *req); - static napi_value response_send_headers(napi_env env, napi_callback_info info); diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 0d1be557..24e51075 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -74,7 +74,6 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); -static int nxt_unit_run_once(nxt_unit_ctx_t *ctx); static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd); @@ -2697,7 +2696,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) } -static int +int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 1b4923a2..2806d035 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -196,6 +196,8 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, */ int nxt_unit_run(nxt_unit_ctx_t *); +int nxt_unit_run_once(nxt_unit_ctx_t *ctx); + /* Destroy application library object. */ void nxt_unit_done(nxt_unit_ctx_t *); |