diff options
Diffstat (limited to 'src/nodejs/unit-http/unit.cpp')
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 291 |
1 files changed, 207 insertions, 84 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 40f641a6..be64a59b 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -5,10 +5,21 @@ #include "unit.h" +#include <unistd.h> +#include <fcntl.h> + +#include <uv.h> + napi_ref Unit::constructor_; +struct nxt_nodejs_ctx_t { + nxt_unit_port_id_t port_id; + uv_poll_t poll; +}; + + Unit::Unit(napi_env env): env_(env), wrapper_(nullptr), @@ -31,11 +42,12 @@ Unit::init(napi_env env, napi_value exports) napi_property_descriptor properties[] = { { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, - { "listen", 0, listen, 0, 0, 0, napi_default, 0 } + { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, + { "_read", 0, _read, 0, 0, 0, napi_default, 0 } }; status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr, - 2, properties, &cons); + 3, properties, &cons); if (status != napi_ok) { goto failed; } @@ -105,6 +117,7 @@ napi_value Unit::create(napi_env env, napi_callback_info info) { Unit *obj; + napi_ref ref; napi_value target, cons, instance, jsthis; napi_status status; @@ -129,6 +142,11 @@ Unit::create(napi_env env, napi_callback_info info) goto failed; } + status = napi_create_reference(env, jsthis, 1, &ref); + if (status != napi_ok) { + goto failed; + } + return jsthis; } @@ -143,6 +161,11 @@ Unit::create(napi_env env, napi_callback_info info) goto failed; } + status = napi_create_reference(env, instance, 1, &ref); + if (status != napi_ok) { + goto failed; + } + return instance; failed: @@ -158,14 +181,13 @@ Unit::create_server(napi_env env, napi_callback_info info) { Unit *obj; size_t argc; - napi_value jsthis; + napi_value jsthis, argv; napi_status status; - napi_value argv[1]; nxt_unit_init_t unit_init; argc = 1; - status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr); + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { goto failed; } @@ -179,6 +201,9 @@ Unit::create_server(napi_env env, napi_callback_info info) unit_init.data = obj; unit_init.callbacks.request_handler = request_handler; + unit_init.callbacks.add_port = add_port; + unit_init.callbacks.remove_port = remove_port; + unit_init.callbacks.quit = quit; obj->unit_ctx_ = nxt_unit_init(&unit_init); if (obj->unit_ctx_ == NULL) { @@ -198,41 +223,53 @@ failed: napi_value Unit::listen(napi_env env, napi_callback_info info) { - int ret; - Unit *obj; - napi_value jsthis; - napi_status status; + return nullptr; +} + - status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr); +napi_value +Unit::_read(napi_env env, napi_callback_info info) +{ + Unit *obj; + void *data; + size_t argc; + int64_t req_pointer; + napi_value jsthis, buffer, argv; + napi_status status; + nxt_unit_request_info_t *req; + + argc = 1; + + status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr); if (status != napi_ok) { - goto failed; + napi_throw_error(env, NULL, "Failed to get arguments from js"); + return nullptr; } status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj)); if (status != napi_ok) { - goto failed; - } - - if (obj->unit_ctx_ == NULL) { - napi_throw_error(env, NULL, "Unit context was not created"); + napi_throw_error(env, NULL, "Failed to get Unit object form js"); return nullptr; } - ret = nxt_unit_run(obj->unit_ctx_); - if (ret != NXT_UNIT_OK) { - napi_throw_error(env, NULL, "Failed to run Unit"); + status = napi_get_value_int64(env, argv, &req_pointer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); return nullptr; } - nxt_unit_done(obj->unit_ctx_); - - return nullptr; + req = (nxt_unit_request_info_t *) (uintptr_t) req_pointer; -failed: + status = napi_create_buffer(env, (size_t) req->content_length, + &data, &buffer); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to create request buffer"); + return nullptr; + } - napi_throw_error(env, NULL, "Failed to listen Unit socket"); + nxt_unit_request_read(req, data, req->content_length); - return nullptr; + return buffer; } @@ -242,8 +279,9 @@ Unit::request_handler(nxt_unit_request_info_t *req) Unit *obj; napi_value socket, request, response; napi_value global, server_obj; - napi_value req_argv[3]; + napi_value run_events, events_res; napi_status status; + napi_value events_args[3]; obj = reinterpret_cast<Unit *>(req->unit->data); @@ -284,73 +322,143 @@ Unit::request_handler(nxt_unit_request_info_t *req) return; } - req_argv[1] = request; - req_argv[2] = response; - status = obj->create_headers(req, request); if (status != napi_ok) { napi_throw_error(obj->env_, NULL, "Failed to create headers"); return; } - obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv); - obj->emit_post_data(request, req); + status = napi_get_named_property(obj->env_, server_obj, "run_events", + &run_events); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get" + " 'run_events' function"); + return; + } + + events_args[0] = server_obj; + events_args[1] = request; + events_args[2] = response; + + status = napi_call_function(obj->env_, server_obj, run_events, 3, + events_args, &events_res); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to call" + " 'run_events' function"); + return; + } napi_close_handle_scope(obj->env_, scope); } -napi_value -Unit::get_server_object() +void +nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { - napi_value unit_obj, server_obj; - napi_status status; + nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); +} - status = napi_get_reference_value(env_, wrapper_, &unit_obj); - if (status != napi_ok) { - return nullptr; + +int +Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int err; + Unit *obj; + uv_loop_t *loop; + napi_status status; + nxt_nodejs_ctx_t *node_ctx; + + if (port->in_fd != -1) { + obj = reinterpret_cast<Unit *>(ctx->unit->data); + + if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { + napi_throw_error(obj->env_, NULL, "Failed to upgrade read" + " file descriptor to O_NONBLOCK"); + return -1; + } + + status = napi_get_uv_event_loop(obj->env_, &loop); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get uv.loop"); + return NXT_UNIT_ERROR; + } + + node_ctx = new nxt_nodejs_ctx_t; + + err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to init uv.poll"); + return NXT_UNIT_ERROR; + } + + err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); + if (err < 0) { + napi_throw_error(obj->env_, NULL, "Failed to start uv.poll"); + return NXT_UNIT_ERROR; + } + + ctx->data = node_ctx; + + node_ctx->port_id = port->id; + node_ctx->poll.data = ctx; } - status = napi_get_named_property(env_, unit_obj, "server", &server_obj); - if (status != napi_ok) { - return nullptr; + return nxt_unit_add_port(ctx, port); +} + + +inline bool +operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) +{ + return p1.pid == p2.pid && p1.id == p2.id; +} + + +void +Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + nxt_nodejs_ctx_t *node_ctx; + + if (ctx->data != NULL) { + node_ctx = (nxt_nodejs_ctx_t *) ctx->data; + + if (node_ctx->port_id == *port_id) { + uv_poll_stop(&node_ctx->poll); + + delete node_ctx; + + ctx->data = NULL; + } } - return server_obj; + nxt_unit_remove_port(ctx, port_id); +} + + +void +Unit::quit(nxt_unit_ctx_t *ctx) +{ + nxt_unit_done(ctx); } napi_value -Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc, - napi_value *argv) +Unit::get_server_object() { - napi_value emitter, return_val, str; + napi_value unit_obj, server_obj; napi_status status; - status = napi_get_named_property(env_, obj, "emit", &emitter); - if (status != napi_ok) { - return nullptr; - } - - status = napi_create_string_latin1(env_, name, name_len, &str); + status = napi_get_reference_value(env_, wrapper_, &unit_obj); if (status != napi_ok) { return nullptr; } - if (argc != 0) { - argv[0] = str; - - } else { - argc = 1; - argv = &str; - } - - status = napi_call_function(env_, obj, emitter, argc, argv, &return_val); + status = napi_get_named_property(env_, unit_obj, "server", &server_obj); if (status != napi_ok) { return nullptr; } - return return_val; + return server_obj; } @@ -391,7 +499,7 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) return status; } - status = napi_set_named_property(env_, request, "raw_headers", raw_headers); + status = napi_set_named_property(env_, request, "rawHeaders", raw_headers); if (status != napi_ok) { return status; } @@ -480,7 +588,7 @@ Unit::append_header(nxt_unit_field_t *f, napi_value headers, napi_value Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) { - napi_value constructor, return_val; + napi_value constructor, return_val, req_pointer; napi_status status; status = napi_get_named_property(env_, server_obj, "socket", @@ -494,6 +602,17 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) return nullptr; } + status = napi_create_int64(env_, (uintptr_t) req, &req_pointer); + if (status != napi_ok) { + return nullptr; + } + + status = napi_set_named_property(env_, return_val, "req_pointer", + req_pointer); + if (status != napi_ok) { + return nullptr; + } + return return_val; } @@ -563,27 +682,6 @@ Unit::create_response(napi_value server_obj, napi_value socket, } -void -Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req) -{ - void *data; - napi_value req_argv[2]; - napi_status status; - - status = napi_create_buffer(env_, (size_t) req->content_length, - &data, &req_argv[1]); - if (status != napi_ok) { - napi_throw_error(env_, NULL, "Failed to create request buffer"); - return; - } - - nxt_unit_request_read(req, data, req->content_length); - - emit(request, "data", sizeof("data") - 1, 2, req_argv); - emit(request, "end", sizeof("end") - 1, 0, nullptr); -} - - napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { @@ -598,6 +696,7 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) napi_value this_arg, headers, keys, name, value, array_val; napi_value req_num; napi_status status; + napi_valuetype val_type; nxt_unit_field_t *f; nxt_unit_request_info_t *req; napi_value argv[5]; @@ -707,6 +806,18 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) goto failed; } + napi_typeof(env, array_val, &val_type); + if (status != napi_ok) { + goto failed; + } + + if (val_type != napi_string) { + status = napi_coerce_to_string(env, array_val, &array_val); + if (status != napi_ok) { + goto failed; + } + } + status = napi_get_value_string_latin1(env, array_val, ptr, header_len, &value_len); @@ -732,6 +843,18 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) } } else { + napi_typeof(env, value, &val_type); + if (status != napi_ok) { + goto failed; + } + + if (val_type != napi_string) { + status = napi_coerce_to_string(env, value, &value); + if (status != napi_ok) { + goto failed; + } + } + status = napi_get_value_string_latin1(env, value, ptr, header_len, &value_len); if (status != napi_ok) { |