diff options
author | Alexander Borisov <alexander.borisov@nginx.com> | 2018-10-31 15:51:51 +0300 |
---|---|---|
committer | Alexander Borisov <alexander.borisov@nginx.com> | 2018-10-31 15:51:51 +0300 |
commit | c838c3bd1580735e8020687f94e6307f13aba156 (patch) | |
tree | d88247c68c70ec4770d76037bd5fefcb836754ac /src/nodejs/unit-http/unit.cpp | |
parent | 3b0afb16814353a5d34a7384f4e84e9c17f3fb8e (diff) | |
download | unit-c838c3bd1580735e8020687f94e6307f13aba156.tar.gz unit-c838c3bd1580735e8020687f94e6307f13aba156.tar.bz2 |
Node.js: added async request execution.
Diffstat (limited to '')
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 227 |
1 files changed, 145 insertions, 82 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 40f641a6..5d7f15c0 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -5,6 +5,11 @@ #include "unit.h" +#include <unistd.h> +#include <fcntl.h> + +#include <uv.h> + napi_ref Unit::constructor_; @@ -31,11 +36,12 @@ Unit::init(napi_env env, napi_value exports) napi_property_descriptor properties[] = { { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, - { "listen", 0, listen, 0, 0, 0, napi_default, 0 } + { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, + { "_read", 0, _read, 0, 0, 0, napi_default, 0 } }; status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr, - 2, properties, &cons); + 3, properties, &cons); if (status != napi_ok) { goto failed; } @@ -158,14 +164,13 @@ Unit::create_server(napi_env env, napi_callback_info info) { Unit *obj; size_t argc; - napi_value jsthis; + napi_value jsthis, argv; napi_status status; - napi_value argv[1]; nxt_unit_init_t unit_init; argc = 1; - status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr); + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { goto failed; } @@ -179,6 +184,8 @@ Unit::create_server(napi_env env, napi_callback_info info) unit_init.data = obj; unit_init.callbacks.request_handler = request_handler; + unit_init.callbacks.add_port = add_port; + unit_init.callbacks.remove_port = remove_port; obj->unit_ctx_ = nxt_unit_init(&unit_init); if (obj->unit_ctx_ == NULL) { @@ -198,41 +205,53 @@ failed: napi_value Unit::listen(napi_env env, napi_callback_info info) { - int ret; - Unit *obj; - napi_value jsthis; - napi_status status; + return nullptr; +} + + +napi_value +Unit::_read(napi_env env, napi_callback_info info) +{ + Unit *obj; + void *data; + size_t argc; + int64_t req_pointer; + napi_value jsthis, buffer, argv; + napi_status status; + nxt_unit_request_info_t *req; + + argc = 1; - status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr); + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { - goto failed; + napi_throw_error(env, NULL, "Failed to get arguments from js"); + return nullptr; } status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj)); if (status != napi_ok) { - goto failed; - } - - if (obj->unit_ctx_ == NULL) { - napi_throw_error(env, NULL, "Unit context was not created"); + napi_throw_error(env, NULL, "Failed to get Unit object form js"); return nullptr; } - ret = nxt_unit_run(obj->unit_ctx_); - if (ret != NXT_UNIT_OK) { - napi_throw_error(env, NULL, "Failed to run Unit"); + status = napi_get_value_int64(env, argv, &req_pointer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); return nullptr; } - nxt_unit_done(obj->unit_ctx_); - - return nullptr; + req = (nxt_unit_request_info_t *) (uintptr_t) req_pointer; -failed: + status = napi_create_buffer(env, (size_t) req->content_length, + &data, &buffer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to create request buffer"); + return nullptr; + } - napi_throw_error(env, NULL, "Failed to listen Unit socket"); + nxt_unit_request_read(req, data, req->content_length); - return nullptr; + return buffer; } @@ -242,8 +261,9 @@ Unit::request_handler(nxt_unit_request_info_t *req) Unit *obj; napi_value socket, request, response; napi_value global, server_obj; - napi_value req_argv[3]; + napi_value run_events, events_res; napi_status status; + napi_value events_args[3]; obj = reinterpret_cast<Unit *>(req->unit->data); @@ -284,73 +304,126 @@ Unit::request_handler(nxt_unit_request_info_t *req) return; } - req_argv[1] = request; - req_argv[2] = response; - status = obj->create_headers(req, request); if (status != napi_ok) { napi_throw_error(obj->env_, NULL, "Failed to create headers"); return; } - obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv); - obj->emit_post_data(request, req); + status = napi_get_named_property(obj->env_, server_obj, "run_events", + &run_events); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get" + " 'run_events' function"); + return; + } + + events_args[0] = server_obj; + events_args[1] = request; + events_args[2] = response; + + status = napi_call_function(obj->env_, server_obj, run_events, 3, + events_args, &events_res); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to call" + " 'run_events' function"); + return; + } napi_close_handle_scope(obj->env_, scope); } -napi_value -Unit::get_server_object() +void +nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { - napi_value unit_obj, server_obj; + nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); +} + + +int +Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int err; + Unit *obj; + uv_loop_t *loop; + uv_poll_t *uv_handle; napi_status status; - status = napi_get_reference_value(env_, wrapper_, &unit_obj); - if (status != napi_ok) { - return nullptr; - } + if (port->in_fd != -1) { + obj = reinterpret_cast<Unit *>(ctx->unit->data); - status = napi_get_named_property(env_, unit_obj, "server", &server_obj); - if (status != napi_ok) { - return nullptr; + if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { + napi_throw_error(obj->env_, NULL, "Failed to upgrade read" + " file descriptor to O_NONBLOCK"); + return -1; + } + + status = napi_get_uv_event_loop(obj->env_, &loop); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get uv.loop"); + return NXT_UNIT_ERROR; + } + + uv_handle = new uv_poll_t; + + err = uv_poll_init(loop, uv_handle, port->in_fd); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to init uv.poll"); + return NXT_UNIT_ERROR; + } + + err = uv_poll_start(uv_handle, UV_READABLE, nxt_uv_read_callback); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to start uv.poll"); + return NXT_UNIT_ERROR; + } + + port->data = uv_handle; + uv_handle->data = ctx; } - return server_obj; + return nxt_unit_add_port(ctx, port); } -napi_value -Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc, - napi_value *argv) +void +Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) { - napi_value emitter, return_val, str; - napi_status status; + nxt_unit_port_t *port; - status = napi_get_named_property(env_, obj, "emit", &emitter); - if (status != napi_ok) { - return nullptr; + port = nxt_unit_find_port(ctx, port_id); + if (port == NULL) { + return; } - status = napi_create_string_latin1(env_, name, name_len, &str); - if (status != napi_ok) { - return nullptr; + if (port->in_fd != -1 && port->data != NULL) { + uv_poll_stop((uv_poll_t *) port->data); + + delete (uv_poll_t *) port->data; } - if (argc != 0) { - argv[0] = str; + nxt_unit_remove_port(ctx, port_id); +} - } else { - argc = 1; - argv = &str; + +napi_value +Unit::get_server_object() +{ + napi_value unit_obj, server_obj; + napi_status status; + + status = napi_get_reference_value(env_, wrapper_, &unit_obj); + if (status != napi_ok) { + return nullptr; } - status = napi_call_function(env_, obj, emitter, argc, argv, &return_val); + status = napi_get_named_property(env_, unit_obj, "server", &server_obj); if (status != napi_ok) { return nullptr; } - return return_val; + return server_obj; } @@ -480,7 +553,7 @@ Unit::append_header(nxt_unit_field_t *f, napi_value headers, napi_value Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) { - napi_value constructor, return_val; + napi_value constructor, return_val, req_pointer; napi_status status; status = napi_get_named_property(env_, server_obj, "socket", @@ -494,6 +567,17 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) return nullptr; } + status = napi_create_int64(env_, (uintptr_t) req, &req_pointer); + if (status != napi_ok) { + return nullptr; + } + + status = napi_set_named_property(env_, return_val, "req_pointer", + req_pointer); + if (status != napi_ok) { + return nullptr; + } + return return_val; } @@ -563,27 +647,6 @@ Unit::create_response(napi_value server_obj, napi_value socket, } -void -Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req) -{ - void *data; - napi_value req_argv[2]; - napi_status status; - - status = napi_create_buffer(env_, (size_t) req->content_length, - &data, &req_argv[1]); - if (status != napi_ok) { - napi_throw_error(env_, NULL, "Failed to create request buffer"); - return; - } - - nxt_unit_request_read(req, data, req->content_length); - - emit(request, "data", sizeof("data") - 1, 2, req_argv); - emit(request, "end", sizeof("end") - 1, 0, nullptr); -} - - napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { |