diff options
Diffstat (limited to 'src/nodejs/unit-http/unit.cpp')
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 517 |
1 files changed, 383 insertions, 134 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 3f66189a..ac10024c 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -10,6 +10,8 @@ #include <uv.h> +#include <nxt_unit_websocket.h> + napi_ref Unit::constructor_; @@ -20,17 +22,27 @@ struct nxt_nodejs_ctx_t { }; +struct req_data_t { + napi_ref sock_ref; + napi_ref resp_ref; + napi_ref conn_ref; +}; + + Unit::Unit(napi_env env, napi_value jsthis): nxt_napi(env), wrapper_(wrap(jsthis, this, destroy)), unit_ctx_(nullptr) { + nxt_unit_debug(NULL, "Unit::Unit()"); } Unit::~Unit() { delete_reference(wrapper_); + + nxt_unit_debug(NULL, "Unit::~Unit()"); } @@ -38,23 +50,26 @@ napi_value Unit::init(napi_env env, napi_value exports) { nxt_napi napi(env); - napi_value cons; + napi_value ctor; - napi_property_descriptor properties[] = { + napi_property_descriptor unit_props[] = { { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, - { "_read", 0, _read, 0, 0, 0, napi_default, 0 } }; try { - cons = napi.define_class("Unit", create, 3, properties); - constructor_ = napi.create_reference(cons); + ctor = napi.define_class("Unit", create, 2, unit_props); + constructor_ = napi.create_reference(ctor); - napi.set_named_property(exports, "Unit", cons); - napi.set_named_property(exports, "unit_response_headers", + napi.set_named_property(exports, "Unit", ctor); + napi.set_named_property(exports, "response_send_headers", response_send_headers); - napi.set_named_property(exports, "unit_response_write", response_write); - napi.set_named_property(exports, "unit_response_end", response_end); + napi.set_named_property(exports, "response_write", response_write); + napi.set_named_property(exports, "response_end", response_end); + napi.set_named_property(exports, "websocket_send_frame", + websocket_send_frame); + napi.set_named_property(exports, "websocket_set_sock", + websocket_set_sock); } catch (exception &e) { napi.throw_error(e); @@ -78,7 +93,7 @@ napi_value Unit::create(napi_env env, napi_callback_info info) { nxt_napi napi(env); - napi_value target, cons, instance, jsthis; + napi_value target, ctor, instance, jsthis; try { target = napi.get_new_target(info); @@ -94,8 +109,8 @@ Unit::create(napi_env env, napi_callback_info info) } /* Invoked as plain function `Unit(...)`, turn into construct call. */ - cons = napi.get_reference_value(constructor_); - instance = napi.new_instance(cons); + ctor = napi.get_reference_value(constructor_); + instance = napi.new_instance(ctor); napi.create_reference(instance); } catch (exception &e) { @@ -130,10 +145,14 @@ Unit::create_server(napi_env env, napi_callback_info info) memset(&unit_init, 0, sizeof(nxt_unit_init_t)); unit_init.data = obj; - unit_init.callbacks.request_handler = request_handler; - unit_init.callbacks.add_port = add_port; - unit_init.callbacks.remove_port = remove_port; - unit_init.callbacks.quit = quit; + unit_init.callbacks.request_handler = request_handler_cb; + unit_init.callbacks.websocket_handler = websocket_handler_cb; + unit_init.callbacks.close_handler = close_handler_cb; + unit_init.callbacks.add_port = add_port; + unit_init.callbacks.remove_port = remove_port; + unit_init.callbacks.quit = quit_cb; + + unit_init.request_data_size = sizeof(req_data_t); obj->unit_ctx_ = nxt_unit_init(&unit_init); if (obj->unit_ctx_ == NULL) { @@ -157,74 +176,139 @@ Unit::listen(napi_env env, napi_callback_info info) } -napi_value -Unit::_read(napi_env env, napi_callback_info info) +void +Unit::request_handler_cb(nxt_unit_request_info_t *req) { - void *data; - size_t argc; - nxt_napi napi(env); - napi_value buffer, argv; - nxt_unit_request_info_t *req; + Unit *obj; - argc = 1; + obj = reinterpret_cast<Unit *>(req->unit->data); + + obj->request_handler(req); +} + + +void +Unit::request_handler(nxt_unit_request_info_t *req) +{ + napi_value socket, request, response, server_obj, emit_request; + + memset(req->data, 0, sizeof(req_data_t)); try { - napi.get_cb_info(info, argc, &argv); + nxt_handle_scope scope(env()); + + server_obj = get_server_object(); + + socket = create_socket(server_obj, req); + request = create_request(server_obj, socket); + response = create_response(server_obj, request, req); + + create_headers(req, request); - req = napi.get_request_info(argv); - buffer = napi.create_buffer((size_t) req->content_length, &data); + emit_request = get_named_property(server_obj, "emit_request"); + + nxt_async_context async_context(env(), "request_handler"); + nxt_callback_scope async_scope(async_context); + + make_callback(async_context, server_obj, emit_request, request, + response); } catch (exception &e) { - napi.throw_error(e); - return nullptr; + nxt_unit_req_warn(req, "request_handler: %s", e.str); } +} - nxt_unit_request_read(req, data, req->content_length); - return buffer; +void +Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) +{ + Unit *obj; + + obj = reinterpret_cast<Unit *>(ws->req->unit->data); + + obj->websocket_handler(ws); } void -Unit::request_handler(nxt_unit_request_info_t *req) +Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) { - Unit *obj; - napi_value socket, request, response, server_obj; - napi_value emit_events; - napi_value events_args[3]; + napi_value frame, server_obj, process_frame, conn; + req_data_t *req_data; - obj = reinterpret_cast<Unit *>(req->unit->data); + req_data = (req_data_t *) ws->req->data; try { - nxt_handle_scope scope(obj->env()); - - server_obj = obj->get_server_object(); + nxt_handle_scope scope(env()); - socket = obj->create_socket(server_obj, req); - request = obj->create_request(server_obj, socket); - response = obj->create_response(server_obj, socket, request, req); + server_obj = get_server_object(); - obj->create_headers(req, request); + frame = create_websocket_frame(server_obj, ws); - emit_events = obj->get_named_property(server_obj, "emit_events"); + conn = get_reference_value(req_data->conn_ref); - events_args[0] = server_obj; - events_args[1] = request; - events_args[2] = response; + process_frame = get_named_property(conn, "processFrame"); - nxt_async_context async_context(obj->env(), "unit_request_handler"); + nxt_async_context async_context(env(), "websocket_handler"); nxt_callback_scope async_scope(async_context); - obj->make_callback(async_context, server_obj, emit_events, - 3, events_args); + make_callback(async_context, conn, process_frame, frame); } catch (exception &e) { - obj->throw_error(e); + nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); } + + nxt_unit_websocket_done(ws); +} + + +void +Unit::close_handler_cb(nxt_unit_request_info_t *req) +{ + Unit *obj; + + obj = reinterpret_cast<Unit *>(req->unit->data); + + obj->close_handler(req); } void +Unit::close_handler(nxt_unit_request_info_t *req) +{ + napi_value conn_handle_close, conn; + req_data_t *req_data; + + req_data = (req_data_t *) req->data; + + try { + nxt_handle_scope scope(env()); + + conn = get_reference_value(req_data->conn_ref); + + conn_handle_close = get_named_property(conn, "handleSocketClose"); + + nxt_async_context async_context(env(), "close_handler"); + nxt_callback_scope async_scope(async_context); + + make_callback(async_context, conn, conn_handle_close, + nxt_napi::create(0)); + + remove_wrap(req_data->sock_ref); + remove_wrap(req_data->resp_ref); + remove_wrap(req_data->conn_ref); + + } catch (exception &e) { + nxt_unit_req_warn(req, "close_handler: %s", e.str); + + return; + } + + nxt_unit_request_done(req, NXT_UNIT_OK); +} + + +static void nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); @@ -244,14 +328,14 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) obj = reinterpret_cast<Unit *>(ctx->unit->data); if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { - obj->throw_error("Failed to upgrade read" - " file descriptor to O_NONBLOCK"); + nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", + port->in_fd, strerror(errno), errno); return -1; } status = napi_get_uv_event_loop(obj->env(), &loop); if (status != napi_ok) { - obj->throw_error("Failed to get uv.loop"); + nxt_unit_warn(ctx, "Failed to get uv.loop"); return NXT_UNIT_ERROR; } @@ -259,13 +343,13 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); if (err < 0) { - obj->throw_error("Failed to init uv.poll"); + nxt_unit_warn(ctx, "Failed to init uv.poll"); return NXT_UNIT_ERROR; } err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); if (err < 0) { - obj->throw_error("Failed to start uv.poll"); + nxt_unit_warn(ctx, "Failed to start uv.poll"); return NXT_UNIT_ERROR; } @@ -308,27 +392,35 @@ Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) void -Unit::quit(nxt_unit_ctx_t *ctx) +Unit::quit_cb(nxt_unit_ctx_t *ctx) { - Unit *obj; - napi_value server_obj, emit_close; + Unit *obj; obj = reinterpret_cast<Unit *>(ctx->unit->data); + obj->quit(ctx); +} + + +void +Unit::quit(nxt_unit_ctx_t *ctx) +{ + napi_value server_obj, emit_close; + try { - nxt_handle_scope scope(obj->env()); + nxt_handle_scope scope(env()); - server_obj = obj->get_server_object(); + server_obj = get_server_object(); - emit_close = obj->get_named_property(server_obj, "emit_close"); + emit_close = get_named_property(server_obj, "emit_close"); - nxt_async_context async_context(obj->env(), "unit_quit"); + nxt_async_context async_context(env(), "unit_quit"); nxt_callback_scope async_scope(async_context); - obj->make_callback(async_context, server_obj, emit_close, 0, NULL); + make_callback(async_context, server_obj, emit_close); } catch (exception &e) { - obj->throw_error(e); + nxt_unit_debug(ctx, "quit: %s", e.str); } nxt_unit_done(ctx); @@ -349,8 +441,9 @@ Unit::get_server_object() void Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) { + void *data; uint32_t i; - napi_value headers, raw_headers; + napi_value headers, raw_headers, buffer; napi_status status; nxt_unit_request_t *r; @@ -373,6 +466,13 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) set_named_property(request, "httpVersion", r->version, r->version_length); set_named_property(request, "method", r->method, r->method_length); set_named_property(request, "url", r->target, r->target_length); + + set_named_property(request, "_websocket_handshake", r->websocket_handshake); + + buffer = create_buffer((size_t) req->content_length, &data); + nxt_unit_request_read(req, data, req->content_length); + + set_named_property(request, "_data", buffer); } @@ -410,15 +510,18 @@ napi_value Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) { napi_value constructor, res; + req_data_t *req_data; nxt_unit_request_t *r; r = req->request; - constructor = get_named_property(server_obj, "socket"); + constructor = get_named_property(server_obj, "Socket"); res = new_instance(constructor); - set_named_property(res, "req_pointer", (intptr_t) req); + req_data = (req_data_t *) req->data; + req_data->sock_ref = wrap(res, req, sock_destroy); + set_named_property(res, "remoteAddress", r->remote, r->remote_length); set_named_property(res, "localAddress", r->local, r->local_length); @@ -429,34 +532,66 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) napi_value Unit::create_request(napi_value server_obj, napi_value socket) { - napi_value constructor, return_val; + napi_value constructor; - constructor = get_named_property(server_obj, "request"); + constructor = get_named_property(server_obj, "ServerRequest"); - return_val = new_instance(constructor, server_obj); + return new_instance(constructor, server_obj, socket); +} - set_named_property(return_val, "socket", socket); - set_named_property(return_val, "connection", socket); - return return_val; +napi_value +Unit::create_response(napi_value server_obj, napi_value request, + nxt_unit_request_info_t *req) +{ + napi_value constructor, res; + req_data_t *req_data; + + constructor = get_named_property(server_obj, "ServerResponse"); + + res = new_instance(constructor, request); + + req_data = (req_data_t *) req->data; + req_data->resp_ref = wrap(res, req, resp_destroy); + + return res; } napi_value -Unit::create_response(napi_value server_obj, napi_value socket, - napi_value request, nxt_unit_request_info_t *req) +Unit::create_websocket_frame(napi_value server_obj, + nxt_unit_websocket_frame_t *ws) { - napi_value constructor, return_val; + void *data; + napi_value constructor, res, buffer; + uint8_t sc[2]; + + constructor = get_named_property(server_obj, "WebSocketFrame"); - constructor = get_named_property(server_obj, "response"); + res = new_instance(constructor); - return_val = new_instance(constructor, request); + set_named_property(res, "fin", (bool) ws->header->fin); + set_named_property(res, "opcode", ws->header->opcode); + set_named_property(res, "length", (int64_t) ws->payload_len); - set_named_property(return_val, "socket", socket); - set_named_property(return_val, "connection", socket); - set_named_property(return_val, "_req_point", (intptr_t) req); + if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { + if (ws->payload_len >= 2) { + nxt_unit_websocket_read(ws, sc, 2); - return return_val; + set_named_property(res, "closeStatus", + (((uint16_t) sc[0]) << 8) | sc[1]); + + } else { + set_named_property(res, "closeStatus", -1); + } + } + + buffer = create_buffer((size_t) ws->content_length, &data); + nxt_unit_websocket_read(ws, data, ws->content_length); + + set_named_property(res, "binaryPayload", buffer); + + return res; } @@ -472,35 +607,32 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) uint16_t hash; nxt_napi napi(env); napi_value this_arg, headers, keys, name, value, array_val; - napi_value req_num, array_entry; + napi_value array_entry; napi_valuetype val_type; nxt_unit_field_t *f; nxt_unit_request_info_t *req; - napi_value argv[5]; + napi_value argv[4]; - argc = 5; + argc = 4; try { this_arg = napi.get_cb_info(info, argc, argv); - if (argc != 5) { + if (argc != 4) { napi.throw_error("Wrong args count. Expected: " "statusCode, headers, headers count, " "headers length"); return nullptr; } - req_num = napi.get_named_property(argv[0], "_req_point"); - - req = napi.get_request_info(req_num); - - status_code = napi.get_value_uint32(argv[1]); - keys_count = napi.get_value_uint32(argv[3]); - header_len = napi.get_value_uint32(argv[4]); + req = napi.get_request_info(this_arg); + status_code = napi.get_value_uint32(argv[0]); + keys_count = napi.get_value_uint32(argv[2]); + header_len = napi.get_value_uint32(argv[3]); /* Need to reserve extra byte for C-string 0-termination. */ header_len++; - headers = argv[2]; + headers = argv[1]; ret = nxt_unit_response_init(req, status_code, keys_count, header_len); if (ret != NXT_UNIT_OK) { @@ -611,65 +743,151 @@ napi_value Unit::response_write(napi_env env, napi_callback_info info) { int ret; - char *ptr; + void *ptr; size_t argc, have_buf_len; uint32_t buf_len; nxt_napi napi(env); - napi_value this_arg, req_num; - napi_status status; + napi_value this_arg; nxt_unit_buf_t *buf; napi_valuetype buf_type; nxt_unit_request_info_t *req; - napi_value argv[3]; + napi_value argv[2]; - argc = 3; + argc = 2; try { this_arg = napi.get_cb_info(info, argc, argv); - if (argc != 3) { + if (argc != 2) { throw exception("Wrong args count. Expected: " "chunk, chunk length"); } - req_num = napi.get_named_property(argv[0], "_req_point"); - req = napi.get_request_info(req_num); + req = napi.get_request_info(this_arg); + buf_type = napi.type_of(argv[0]); + buf_len = napi.get_value_uint32(argv[1]) + 1; + + buf = nxt_unit_response_buf_alloc(req, buf_len); + if (buf == NULL) { + throw exception("Failed to allocate response buffer"); + } + + if (buf_type == napi_string) { + /* TODO: will work only for utf8 content-type */ + + have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, + buf_len); - buf_len = napi.get_value_uint32(argv[2]); + } else { + ptr = napi.get_buffer_info(argv[0], have_buf_len); - buf_type = napi.type_of(argv[1]); + memcpy(buf->free, ptr, have_buf_len); + } + buf->free += have_buf_len; + + ret = nxt_unit_buf_send(buf); + if (ret != NXT_UNIT_OK) { + throw exception("Failed to send body buf"); + } } catch (exception &e) { napi.throw_error(e); return nullptr; } - buf_len++; + return this_arg; +} - buf = nxt_unit_response_buf_alloc(req, buf_len); - if (buf == NULL) { - goto failed; - } - if (buf_type == napi_string) { - /* TODO: will work only for utf8 content-type */ +napi_value +Unit::response_end(napi_env env, napi_callback_info info) +{ + nxt_napi napi(env); + napi_value this_arg; + req_data_t *req_data; + nxt_unit_request_info_t *req; + + try { + this_arg = napi.get_cb_info(info); + + req = napi.get_request_info(this_arg); - status = napi_get_value_string_utf8(env, argv[1], buf->free, - buf_len, &have_buf_len); + req_data = (req_data_t *) req->data; - } else { - status = napi_get_buffer_info(env, argv[1], (void **) &ptr, - &have_buf_len); + napi.remove_wrap(req_data->sock_ref); + napi.remove_wrap(req_data->resp_ref); + napi.remove_wrap(req_data->conn_ref); - memcpy(buf->free, ptr, have_buf_len); + } catch (exception &e) { + napi.throw_error(e); + return nullptr; } - if (status != napi_ok) { - goto failed; + nxt_unit_request_done(req, NXT_UNIT_OK); + + return this_arg; +} + + +napi_value +Unit::websocket_send_frame(napi_env env, napi_callback_info info) +{ + int ret, iovec_len; + bool fin; + size_t buf_len; + uint32_t opcode, sc; + nxt_napi napi(env); + napi_value this_arg, frame, payload; + nxt_unit_request_info_t *req; + char status_code[2]; + struct iovec iov[2]; + + iovec_len = 0; + + try { + this_arg = napi.get_cb_info(info, frame); + + req = napi.get_request_info(this_arg); + + opcode = napi.get_value_uint32(napi.get_named_property(frame, + "opcode")); + if (opcode == NXT_WEBSOCKET_OP_CLOSE) { + sc = napi.get_value_uint32(napi.get_named_property(frame, + "closeStatus")); + status_code[0] = (sc >> 8) & 0xFF; + status_code[1] = sc & 0xFF; + + iov[iovec_len].iov_base = status_code; + iov[iovec_len].iov_len = 2; + iovec_len++; + } + + try { + fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); + + } catch (exception &e) { + fin = true; + } + + payload = napi.get_named_property(frame, "binaryPayload"); + + if (napi.is_buffer(payload)) { + iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); + + } else { + buf_len = 0; + } + + } catch (exception &e) { + napi.throw_error(e); + return nullptr; } - buf->free += have_buf_len; + if (buf_len > 0) { + iov[iovec_len].iov_len = buf_len; + iovec_len++; + } - ret = nxt_unit_buf_send(buf); + ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); if (ret != NXT_UNIT_OK) { goto failed; } @@ -678,34 +896,65 @@ Unit::response_write(napi_env env, napi_callback_info info) failed: - napi.throw_error("Failed to write body"); + napi.throw_error("Failed to send frame"); return nullptr; } napi_value -Unit::response_end(napi_env env, napi_callback_info info) +Unit::websocket_set_sock(napi_env env, napi_callback_info info) { - size_t argc; nxt_napi napi(env); - napi_value resp, this_arg, req_num; + napi_value this_arg, sock; + req_data_t *req_data; nxt_unit_request_info_t *req; - argc = 1; - try { - this_arg = napi.get_cb_info(info, argc, &resp); + this_arg = napi.get_cb_info(info, sock); - req_num = napi.get_named_property(resp, "_req_point"); - req = napi.get_request_info(req_num); + req = napi.get_request_info(sock); + + req_data = (req_data_t *) req->data; + req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); } catch (exception &e) { napi.throw_error(e); return nullptr; } - nxt_unit_request_done(req, NXT_UNIT_OK); - return this_arg; } + + +void +Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) +{ + nxt_unit_request_info_t *req; + + req = (nxt_unit_request_info_t *) nativeObject; + + nxt_unit_warn(NULL, "conn_destroy: %p", req); +} + + +void +Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) +{ + nxt_unit_request_info_t *req; + + req = (nxt_unit_request_info_t *) nativeObject; + + nxt_unit_warn(NULL, "sock_destroy: %p", req); +} + + +void +Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) +{ + nxt_unit_request_info_t *req; + + req = (nxt_unit_request_info_t *) nativeObject; + + nxt_unit_warn(NULL, "resp_destroy: %p", req); +} |