summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorAlexander Borisov <alexander.borisov@nginx.com>2018-10-31 15:51:51 +0300
committerAlexander Borisov <alexander.borisov@nginx.com>2018-10-31 15:51:51 +0300
commitc838c3bd1580735e8020687f94e6307f13aba156 (patch)
treed88247c68c70ec4770d76037bd5fefcb836754ac /src
parent3b0afb16814353a5d34a7384f4e84e9c17f3fb8e (diff)
downloadunit-c838c3bd1580735e8020687f94e6307f13aba156.tar.gz
unit-c838c3bd1580735e8020687f94e6307f13aba156.tar.bz2
Node.js: added async request execution.
Diffstat (limited to 'src')
-rwxr-xr-xsrc/nodejs/unit-http/http_server.js29
-rw-r--r--src/nodejs/unit-http/unit.cpp227
-rw-r--r--src/nodejs/unit-http/unit.h8
-rw-r--r--src/nxt_unit.c3
-rw-r--r--src/nxt_unit.h2
5 files changed, 176 insertions, 93 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js
index fa7b8e9b..ddacb420 100755
--- a/src/nodejs/unit-http/http_server.js
+++ b/src/nodejs/unit-http/http_server.js
@@ -232,7 +232,6 @@ ServerResponse.prototype.write = function write(chunk, encoding, callback) {
ServerResponse.prototype.end = function end(chunk, encoding, callback) {
this._writeBody(chunk, encoding, callback);
- unit_lib.unit_response_end(this)
this.finished = true;
@@ -290,10 +289,10 @@ function Server(requestListener) {
EventEmitter.call(this);
this.unit = new unit_lib.Unit();
- this.unit.createServer();
-
this.unit.server = this;
+ this.unit.createServer();
+
this.socket = Socket;
this.request = ServerRequest;
this.response = ServerResponse;
@@ -318,10 +317,32 @@ Server.prototype.listen = function () {
this.unit.listen();
};
+Server.prototype.run_events = function (server, req, res) {
+ /* Important!!! setImmediate starts the next iteration in Node.js loop. */
+ setImmediate(function () {
+ server.emit("request", req, res);
+
+ Promise.resolve().then(() => {
+ let buf = server.unit._read(req.socket.req_pointer);
+
+ if (buf.length != 0) {
+ req.emit("data", buf);
+ }
+
+ req.emit("end");
+ });
+
+ Promise.resolve().then(() => {
+ if (res.finished) {
+ unit_lib.unit_response_end(res);
+ }
+ });
+ });
+};
+
function connectionListener(socket) {
}
-
module.exports = {
STATUS_CODES: http.STATUS_CODES,
Server,
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 40f641a6..5d7f15c0 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -5,6 +5,11 @@
#include "unit.h"
+#include <unistd.h>
+#include <fcntl.h>
+
+#include <uv.h>
+
napi_ref Unit::constructor_;
@@ -31,11 +36,12 @@ Unit::init(napi_env env, napi_value exports)
napi_property_descriptor properties[] = {
{ "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
- { "listen", 0, listen, 0, 0, 0, napi_default, 0 }
+ { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
+ { "_read", 0, _read, 0, 0, 0, napi_default, 0 }
};
status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr,
- 2, properties, &cons);
+ 3, properties, &cons);
if (status != napi_ok) {
goto failed;
}
@@ -158,14 +164,13 @@ Unit::create_server(napi_env env, napi_callback_info info)
{
Unit *obj;
size_t argc;
- napi_value jsthis;
+ napi_value jsthis, argv;
napi_status status;
- napi_value argv[1];
nxt_unit_init_t unit_init;
argc = 1;
- status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr);
+ status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr);
if (status != napi_ok) {
goto failed;
}
@@ -179,6 +184,8 @@ Unit::create_server(napi_env env, napi_callback_info info)
unit_init.data = obj;
unit_init.callbacks.request_handler = request_handler;
+ unit_init.callbacks.add_port = add_port;
+ unit_init.callbacks.remove_port = remove_port;
obj->unit_ctx_ = nxt_unit_init(&unit_init);
if (obj->unit_ctx_ == NULL) {
@@ -198,41 +205,53 @@ failed:
napi_value
Unit::listen(napi_env env, napi_callback_info info)
{
- int ret;
- Unit *obj;
- napi_value jsthis;
- napi_status status;
+ return nullptr;
+}
+
+
+napi_value
+Unit::_read(napi_env env, napi_callback_info info)
+{
+ Unit *obj;
+ void *data;
+ size_t argc;
+ int64_t req_pointer;
+ napi_value jsthis, buffer, argv;
+ napi_status status;
+ nxt_unit_request_info_t *req;
+
+ argc = 1;
- status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr);
+ status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr);
if (status != napi_ok) {
- goto failed;
+ napi_throw_error(env, NULL, "Failed to get arguments from js");
+ return nullptr;
}
status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj));
if (status != napi_ok) {
- goto failed;
- }
-
- if (obj->unit_ctx_ == NULL) {
- napi_throw_error(env, NULL, "Unit context was not created");
+ napi_throw_error(env, NULL, "Failed to get Unit object form js");
return nullptr;
}
- ret = nxt_unit_run(obj->unit_ctx_);
- if (ret != NXT_UNIT_OK) {
- napi_throw_error(env, NULL, "Failed to run Unit");
+ status = napi_get_value_int64(env, argv, &req_pointer);
+ if (status != napi_ok) {
+ napi_throw_error(env, NULL, "Failed to get request pointer");
return nullptr;
}
- nxt_unit_done(obj->unit_ctx_);
-
- return nullptr;
+ req = (nxt_unit_request_info_t *) (uintptr_t) req_pointer;
-failed:
+ status = napi_create_buffer(env, (size_t) req->content_length,
+ &data, &buffer);
+ if (status != napi_ok) {
+ napi_throw_error(env, NULL, "Failed to create request buffer");
+ return nullptr;
+ }
- napi_throw_error(env, NULL, "Failed to listen Unit socket");
+ nxt_unit_request_read(req, data, req->content_length);
- return nullptr;
+ return buffer;
}
@@ -242,8 +261,9 @@ Unit::request_handler(nxt_unit_request_info_t *req)
Unit *obj;
napi_value socket, request, response;
napi_value global, server_obj;
- napi_value req_argv[3];
+ napi_value run_events, events_res;
napi_status status;
+ napi_value events_args[3];
obj = reinterpret_cast<Unit *>(req->unit->data);
@@ -284,73 +304,126 @@ Unit::request_handler(nxt_unit_request_info_t *req)
return;
}
- req_argv[1] = request;
- req_argv[2] = response;
-
status = obj->create_headers(req, request);
if (status != napi_ok) {
napi_throw_error(obj->env_, NULL, "Failed to create headers");
return;
}
- obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv);
- obj->emit_post_data(request, req);
+ status = napi_get_named_property(obj->env_, server_obj, "run_events",
+ &run_events);
+ if (status != napi_ok) {
+ napi_throw_error(obj->env_, NULL, "Failed to get"
+ " 'run_events' function");
+ return;
+ }
+
+ events_args[0] = server_obj;
+ events_args[1] = request;
+ events_args[2] = response;
+
+ status = napi_call_function(obj->env_, server_obj, run_events, 3,
+ events_args, &events_res);
+ if (status != napi_ok) {
+ napi_throw_error(obj->env_, NULL, "Failed to call"
+ " 'run_events' function");
+ return;
+ }
napi_close_handle_scope(obj->env_, scope);
}
-napi_value
-Unit::get_server_object()
+void
+nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{
- napi_value unit_obj, server_obj;
+ nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
+}
+
+
+int
+Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int err;
+ Unit *obj;
+ uv_loop_t *loop;
+ uv_poll_t *uv_handle;
napi_status status;
- status = napi_get_reference_value(env_, wrapper_, &unit_obj);
- if (status != napi_ok) {
- return nullptr;
- }
+ if (port->in_fd != -1) {
+ obj = reinterpret_cast<Unit *>(ctx->unit->data);
- status = napi_get_named_property(env_, unit_obj, "server", &server_obj);
- if (status != napi_ok) {
- return nullptr;
+ if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
+ napi_throw_error(obj->env_, NULL, "Failed to upgrade read"
+ " file descriptor to O_NONBLOCK");
+ return -1;
+ }
+
+ status = napi_get_uv_event_loop(obj->env_, &loop);
+ if (status != napi_ok) {
+ napi_throw_error(obj->env_, NULL, "Failed to get uv.loop");
+ return NXT_UNIT_ERROR;
+ }
+
+ uv_handle = new uv_poll_t;
+
+ err = uv_poll_init(loop, uv_handle, port->in_fd);
+ if (err < 0) {
+ napi_throw_error(obj->env_, NULL, "Failed to init uv.poll");
+ return NXT_UNIT_ERROR;
+ }
+
+ err = uv_poll_start(uv_handle, UV_READABLE, nxt_uv_read_callback);
+ if (err < 0) {
+ napi_throw_error(obj->env_, NULL, "Failed to start uv.poll");
+ return NXT_UNIT_ERROR;
+ }
+
+ port->data = uv_handle;
+ uv_handle->data = ctx;
}
- return server_obj;
+ return nxt_unit_add_port(ctx, port);
}
-napi_value
-Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc,
- napi_value *argv)
+void
+Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
- napi_value emitter, return_val, str;
- napi_status status;
+ nxt_unit_port_t *port;
- status = napi_get_named_property(env_, obj, "emit", &emitter);
- if (status != napi_ok) {
- return nullptr;
+ port = nxt_unit_find_port(ctx, port_id);
+ if (port == NULL) {
+ return;
}
- status = napi_create_string_latin1(env_, name, name_len, &str);
- if (status != napi_ok) {
- return nullptr;
+ if (port->in_fd != -1 && port->data != NULL) {
+ uv_poll_stop((uv_poll_t *) port->data);
+
+ delete (uv_poll_t *) port->data;
}
- if (argc != 0) {
- argv[0] = str;
+ nxt_unit_remove_port(ctx, port_id);
+}
- } else {
- argc = 1;
- argv = &str;
+
+napi_value
+Unit::get_server_object()
+{
+ napi_value unit_obj, server_obj;
+ napi_status status;
+
+ status = napi_get_reference_value(env_, wrapper_, &unit_obj);
+ if (status != napi_ok) {
+ return nullptr;
}
- status = napi_call_function(env_, obj, emitter, argc, argv, &return_val);
+ status = napi_get_named_property(env_, unit_obj, "server", &server_obj);
if (status != napi_ok) {
return nullptr;
}
- return return_val;
+ return server_obj;
}
@@ -480,7 +553,7 @@ Unit::append_header(nxt_unit_field_t *f, napi_value headers,
napi_value
Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
{
- napi_value constructor, return_val;
+ napi_value constructor, return_val, req_pointer;
napi_status status;
status = napi_get_named_property(env_, server_obj, "socket",
@@ -494,6 +567,17 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
return nullptr;
}
+ status = napi_create_int64(env_, (uintptr_t) req, &req_pointer);
+ if (status != napi_ok) {
+ return nullptr;
+ }
+
+ status = napi_set_named_property(env_, return_val, "req_pointer",
+ req_pointer);
+ if (status != napi_ok) {
+ return nullptr;
+ }
+
return return_val;
}
@@ -563,27 +647,6 @@ Unit::create_response(napi_value server_obj, napi_value socket,
}
-void
-Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req)
-{
- void *data;
- napi_value req_argv[2];
- napi_status status;
-
- status = napi_create_buffer(env_, (size_t) req->content_length,
- &data, &req_argv[1]);
- if (status != napi_ok) {
- napi_throw_error(env_, NULL, "Failed to create request buffer");
- return;
- }
-
- nxt_unit_request_read(req, data, req->content_length);
-
- emit(request, "data", sizeof("data") - 1, 2, req_argv);
- emit(request, "end", sizeof("end") - 1, 0, nullptr);
-}
-
-
napi_value
Unit::response_send_headers(napi_env env, napi_callback_info info)
{
diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h
index 753a14d8..90c67efc 100644
--- a/src/nodejs/unit-http/unit.h
+++ b/src/nodejs/unit-http/unit.h
@@ -36,13 +36,13 @@ private:
static napi_value create_server(napi_env env, napi_callback_info info);
static napi_value listen(napi_env env, napi_callback_info info);
+ static napi_value _read(napi_env env, napi_callback_info info);
static void request_handler(nxt_unit_request_info_t *req);
+ static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
+ static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
napi_value get_server_object();
- napi_value emit(napi_value obj, const char *name, size_t name_len,
- size_t argc, napi_value *argv);
-
napi_value create_socket(napi_value server_obj,
nxt_unit_request_info_t *req);
@@ -52,8 +52,6 @@ private:
napi_value request,
nxt_unit_request_info_t *req, Unit *obj);
- void emit_post_data(napi_value request, nxt_unit_request_info_t *req);
-
static napi_value response_send_headers(napi_env env,
napi_callback_info info);
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 0d1be557..24e51075 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -74,7 +74,6 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
-static int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, int *fd);
@@ -2697,7 +2696,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
}
-static int
+int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 1b4923a2..2806d035 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -196,6 +196,8 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
*/
int nxt_unit_run(nxt_unit_ctx_t *);
+int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
+
/* Destroy application library object. */
void nxt_unit_done(nxt_unit_ctx_t *);