/* * Copyright (C) NGINX, Inc. */ #include "unit.h" #include <unistd.h> #include <fcntl.h> #include <uv.h> #include <nxt_unit_websocket.h> napi_ref Unit::constructor_; struct port_data_t { port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p); void process_port_msg(); void stop(); template<typename T> static port_data_t *get(T *handle); static void read_callback(uv_poll_t *handle, int status, int events); static void timer_callback(uv_timer_t *handle); static void delete_data(uv_handle_t* handle); nxt_unit_ctx_t *ctx; nxt_unit_port_t *port; uv_poll_t poll; uv_timer_t timer; int ref_count; bool scheduled; bool stopped; }; struct req_data_t { napi_ref sock_ref; napi_ref req_ref; napi_ref resp_ref; napi_ref conn_ref; }; port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) : ctx(c), port(p), ref_count(0), scheduled(false), stopped(false) { timer.type = UV_UNKNOWN_HANDLE; } void port_data_t::process_port_msg() { int rc, err; rc = nxt_unit_process_port_msg(ctx, port); if (rc != NXT_UNIT_OK) { return; } if (timer.type == UV_UNKNOWN_HANDLE) { err = uv_timer_init(poll.loop, &timer); if (err < 0) { nxt_unit_warn(ctx, "Failed to init uv.poll"); return; } ref_count++; timer.data = this; } if (!scheduled && !stopped) { uv_timer_start(&timer, timer_callback, 0, 0); scheduled = true; } } void port_data_t::stop() { stopped = true; uv_poll_stop(&poll); uv_close((uv_handle_t *) &poll, delete_data); if (timer.type == UV_UNKNOWN_HANDLE) { return; } uv_timer_stop(&timer); uv_close((uv_handle_t *) &timer, delete_data); } template<typename T> port_data_t * port_data_t::get(T *handle) { return (port_data_t *) handle->data; } void port_data_t::read_callback(uv_poll_t *handle, int status, int events) { get(handle)->process_port_msg(); } void port_data_t::timer_callback(uv_timer_t *handle) { port_data_t *data; data = get(handle); data->scheduled = false; if (data->stopped) { return; } data->process_port_msg(); } void port_data_t::delete_data(uv_handle_t* handle) { port_data_t *data; data = get(handle); if (--data->ref_count <= 0) { delete data; } } Unit::Unit(napi_env env, napi_value jsthis): nxt_napi(env), wrapper_(wrap(jsthis, this, destroy)), unit_ctx_(nullptr) { nxt_unit_debug(NULL, "Unit::Unit()"); } Unit::~Unit() { delete_reference(wrapper_); nxt_unit_debug(NULL, "Unit::~Unit()"); } napi_value Unit::init(napi_env env, napi_value exports) { nxt_napi napi(env); napi_value ctor; napi_property_descriptor unit_props[] = { { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, }; try { ctor = napi.define_class("Unit", create, 2, unit_props); constructor_ = napi.create_reference(ctor); napi.set_named_property(exports, "Unit", ctor); napi.set_named_property(exports, "request_read", request_read); napi.set_named_property(exports, "response_send_headers", response_send_headers); napi.set_named_property(exports, "response_write", response_write); napi.set_named_property(exports, "response_end", response_end); napi.set_named_property(exports, "websocket_send_frame", websocket_send_frame); napi.set_named_property(exports, "websocket_set_sock", websocket_set_sock); napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); } catch (exception &e) { napi.throw_error(e); return nullptr; } return exports; } void Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) { Unit *obj = reinterpret_cast<Unit *>(nativeObject); delete obj; } napi_value Unit::create(napi_env env, napi_callback_info info) { nxt_napi napi(env); napi_value target, ctor, instance, jsthis; try { target = napi.get_new_target(info); if (target != nullptr) { /* Invoked as constructor: `new Unit(...)`. */ jsthis = napi.get_cb_info(info); new Unit(env, jsthis); napi.create_reference(jsthis); return jsthis; } /* Invoked as plain function `Unit(...)`, turn into construct call. */ ctor = napi.get_reference_value(constructor_); instance = napi.new_instance(ctor); napi.create_reference(instance); } catch (exception &e) { napi.throw_error(e); return nullptr; } return instance; } napi_value Unit::create_server(napi_env env, napi_callback_info info) { Unit *obj; size_t argc; nxt_napi napi(env); napi_value jsthis, argv; nxt_unit_init_t unit_init; argc = 1; try { jsthis = napi.get_cb_info(info, argc, &argv); obj = (Unit *) napi.unwrap(jsthis); } catch (exception &e) { napi.throw_error(e); return nullptr; } memset(&unit_init, 0, sizeof(nxt_unit_init_t)); unit_init.data = obj; unit_init.callbacks.request_handler = request_handler_cb; unit_init.callbacks.websocket_handler = websocket_handler_cb; unit_init.callbacks.close_handler = close_handler_cb; unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; unit_init.callbacks.add_port = add_port; unit_init.callbacks.remove_port = remove_port; unit_init.callbacks.quit = quit_cb; unit_init.request_data_size = sizeof(req_data_t); obj->unit_ctx_ = nxt_unit_init(&unit_init); if (obj->unit_ctx_ == NULL) { goto failed; } return nullptr; failed: napi_throw_error(env, NULL, "Failed to create Unit object"); return nullptr; } napi_value Unit::listen(napi_env env, napi_callback_info info) { return nullptr; } void Unit::request_handler_cb(nxt_unit_request_info_t *req) { Unit *obj; obj = reinterpret_cast<Unit *>(req->unit->data); obj->request_handler(req); } void Unit::request_handler(nxt_unit_request_info_t *req) { napi_value socket, request, response, server_obj, emit_request; memset(req->data, 0, sizeof(req_data_t)); try { nxt_handle_scope scope(env()); server_obj = get_server_object(); socket = create_socket(server_obj, req); request = create_request(server_obj, socket, req); response = create_response(server_obj, request, req); create_headers(req, request); emit_request = get_named_property(server_obj, "emit_request"); nxt_async_context async_context(env(), "request_handler"); nxt_callback_scope async_scope(async_context); make_callback(async_context, server_obj, emit_request, request, response); } catch (exception &e) { nxt_unit_req_warn(req, "request_handler: %s", e.str); } } void Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) { Unit *obj; obj = reinterpret_cast<Unit *>(ws->req->unit->data); obj->websocket_handler(ws); } void Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) { napi_value frame, server_obj, process_frame, conn; req_data_t *req_data; req_data = (req_data_t *) ws->req->data; try { nxt_handle_scope scope(env()); server_obj = get_server_object(); frame = create_websocket_frame(server_obj, ws); conn = get_reference_value(req_data->conn_ref); process_frame = get_named_property(conn, "processFrame"); nxt_async_context async_context(env(), "websocket_handler"); nxt_callback_scope async_scope(async_context); make_callback(async_context, conn, process_frame, frame); } catch (exception &e) { nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); } nxt_unit_websocket_done(ws); } void Unit::close_handler_cb(nxt_unit_request_info_t *req) { Unit *obj; obj = reinterpret_cast<Unit *>(req->unit->data); obj->close_handler(req); } void Unit::close_handler(nxt_unit_request_info_t *req) { napi_value conn_handle_close, conn; req_data_t *req_data; req_data = (req_data_t *) req->data; try { nxt_handle_scope scope(env()); conn = get_reference_value(req_data->conn_ref); conn_handle_close = get_named_property(conn, "handleSocketClose"); nxt_async_context async_context(env(), "close_handler"); nxt_callback_scope async_scope(async_context); make_callback(async_context, conn, conn_handle_close, nxt_napi::create(0)); remove_wrap(req_data->sock_ref); remove_wrap(req_data->req_ref); remove_wrap(req_data->resp_ref); remove_wrap(req_data->conn_ref); } catch (exception &e) { nxt_unit_req_warn(req, "close_handler: %s", e.str); nxt_unit_request_done(req, NXT_UNIT_ERROR); return; } nxt_unit_request_done(req, NXT_UNIT_OK); } void Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) { Unit *obj; obj = reinterpret_cast<Unit *>(ctx->unit->data); obj->shm_ack_handler(ctx); } void Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) { napi_value server_obj, emit_drain; try { nxt_handle_scope scope(env()); server_obj = get_server_object(); emit_drain = get_named_property(server_obj, "emit_drain"); nxt_async_context async_context(env(), "shm_ack_handler"); nxt_callback_scope async_scope(async_context); make_callback(async_context, server_obj, emit_drain); } catch (exception &e) { nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); } } int Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int err; Unit *obj; uv_loop_t *loop; port_data_t *data; napi_status status; if (port->in_fd != -1) { if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", port->in_fd, strerror(errno), errno); return -1; } obj = reinterpret_cast<Unit *>(ctx->unit->data); status = napi_get_uv_event_loop(obj->env(), &loop); if (status != napi_ok) { nxt_unit_warn(ctx, "Failed to get uv.loop"); return NXT_UNIT_ERROR; } data = new port_data_t(ctx, port); err = uv_poll_init(loop, &data->poll, port->in_fd); if (err < 0) { nxt_unit_warn(ctx, "Failed to init uv.poll"); delete data; return NXT_UNIT_ERROR; } err = uv_poll_start(&data->poll, UV_READABLE, port_data_t::read_callback); if (err < 0) { nxt_unit_warn(ctx, "Failed to start uv.poll"); delete data; return NXT_UNIT_ERROR; } port->data = data; data->ref_count++; data->poll.data = data; } return NXT_UNIT_OK; } void Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { port_data_t *data; if (port->data != NULL && ctx != NULL) { data = (port_data_t *) port->data; data->stop(); } } void Unit::quit_cb(nxt_unit_ctx_t *ctx) { Unit *obj; obj = reinterpret_cast<Unit *>(ctx->unit->data); obj->quit(ctx); } void Unit::quit(nxt_unit_ctx_t *ctx) { napi_value server_obj, emit_close; try { nxt_handle_scope scope(env()); server_obj = get_server_object(); emit_close = get_named_property(server_obj, "emit_close"); nxt_async_context async_context(env(), "unit_quit"); nxt_callback_scope async_scope(async_context); make_callback(async_context, server_obj, emit_close); } catch (exception &e) { nxt_unit_debug(ctx, "quit: %s", e.str); } nxt_unit_done(ctx); } napi_value Unit::get_server_object() { napi_value unit_obj; unit_obj = get_reference_value(wrapper_); return get_named_property(unit_obj, "server"); } void Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) { uint32_t i; napi_value headers, raw_headers; napi_status status; nxt_unit_request_t *r; r = req->request; headers = create_object(); status = napi_create_array_with_length(env(), r->fields_count * 2, &raw_headers); if (status != napi_ok) { throw exception("Failed to create array"); } for (i = 0; i < r->fields_count; i++) { append_header(r->fields + i, headers, raw_headers, i); } set_named_property(request, "headers", headers); set_named_property(request, "rawHeaders", raw_headers); set_named_property(request, "httpVersion", r->version, r->version_length); set_named_property(request, "method", r->method, r->method_length); set_named_property(request, "url", r->target, r->target_length); set_named_property(request, "_websocket_handshake", r->websocket_handshake); } inline char lowcase(char c) { return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; } inline void Unit::append_header(nxt_unit_field_t *f, napi_value headers, napi_value raw_headers, uint32_t idx) { char *name; uint8_t i; napi_value str, vstr; name = (char *) nxt_unit_sptr_get(&f->name); str = create_string_latin1(name, f->name_length); for (i = 0; i < f->name_length; i++) { name[i] = lowcase(name[i]); } vstr = set_named_property(headers, name, f->value, f->value_length); set_element(raw_headers, idx * 2, str); set_element(raw_headers, idx * 2 + 1, vstr); } napi_value Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) { napi_value constructor, res; req_data_t *req_data; nxt_unit_request_t *r; r = req->request; constructor = get_named_property(server_obj, "Socket"); res = new_instance(constructor); req_data = (req_data_t *) req->data; req_data->sock_ref = wrap(res, req, sock_destroy); set_named_property(res, "remoteAddress", r->remote, r->remote_length); set_named_property(res, "localAddress", r->local_addr, r->local_addr_length); return res; } napi_value Unit::create_request(napi_value server_obj, napi_value socket, nxt_unit_request_info_t *req) { napi_value constructor, res; req_data_t *req_data; constructor = get_named_property(server_obj, "ServerRequest"); res = new_instance(constructor, server_obj, socket); req_data = (req_data_t *) req->data; req_data->req_ref = wrap(res, req, req_destroy); return res; } napi_value Unit::create_response(napi_value server_obj, napi_value request, nxt_unit_request_info_t *req) { napi_value constructor, res; req_data_t *req_data; constructor = get_named_property(server_obj, "ServerResponse"); res = new_instance(constructor, request); req_data = (req_data_t *) req->data; req_data->resp_ref = wrap(res, req, resp_destroy); return res; } napi_value Unit::create_websocket_frame(napi_value server_obj, nxt_unit_websocket_frame_t *ws) { void *data; napi_value constructor, res, buffer; uint8_t sc[2]; constructor = get_named_property(server_obj, "WebSocketFrame"); res = new_instance(constructor); set_named_property(res, "fin", (bool) ws->header->fin); set_named_property(res, "opcode", ws->header->opcode); set_named_property(res, "length", (int64_t) ws->payload_len); if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { if (ws->payload_len >= 2) { nxt_unit_websocket_read(ws, sc, 2); set_named_property(res, "closeStatus", (((uint16_t) sc[0]) << 8) | sc[1]); } else { set_named_property(res, "closeStatus", -1); } } buffer = create_buffer((size_t) ws->content_length, &data); nxt_unit_websocket_read(ws, data, ws->content_length); set_named_property(res, "binaryPayload", buffer); return res; } napi_value Unit::request_read(napi_env env, napi_callback_info info) { void *data; uint32_t wm; nxt_napi napi(env); napi_value this_arg, argv, buffer; nxt_unit_request_info_t *req; try { this_arg = napi.get_cb_info(info, argv); try { req = napi.get_request_info(this_arg); } catch (exception &e) { return nullptr; } if (req->content_length == 0) { return nullptr; } wm = napi.get_value_uint32(argv); if (wm > req->content_length) { wm = req->content_length; } buffer = napi.create_buffer((size_t) wm, &data); nxt_unit_request_read(req, data, wm); } catch (exception &e) { napi.throw_error(e); return nullptr; } return buffer; } napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { int ret; char *ptr, *name_ptr; bool is_array; size_t argc, name_len, value_len; uint32_t status_code, header_len, keys_len, array_len; uint32_t keys_count, i, j; uint16_t hash; nxt_napi napi(env); napi_value this_arg, headers, keys, name, value, array_val; napi_value array_entry; napi_valuetype val_type; nxt_unit_field_t *f; nxt_unit_request_info_t *req; napi_value argv[4]; argc = 4; try { this_arg = napi.get_cb_info(info, argc, argv); if (argc != 4) { napi.throw_error("Wrong args count. Expected: " "statusCode, headers, headers count, " "headers length"); return nullptr; } req = napi.get_request_info(this_arg); status_code = napi.get_value_uint32(argv[0]); keys_count = napi.get_value_uint32(argv[2]); header_len = napi.get_value_uint32(argv[3]); headers = argv[1]; ret = nxt_unit_response_init(req, status_code, keys_count, header_len); if (ret != NXT_UNIT_OK) { napi.throw_error("Failed to create response"); return nullptr; } /* * Each name and value are 0-terminated by libunit. * Need to add extra 2 bytes for each header. */ header_len += keys_count * 2; keys = napi.get_property_names(headers); keys_len = napi.get_array_length(keys); ptr = req->response_buf->free; for (i = 0; i < keys_len; i++) { name = napi.get_element(keys, i); array_entry = napi.get_property(headers, name); name = napi.get_element(array_entry, 0); value = napi.get_element(array_entry, 1); name_len = napi.get_value_string_latin1(name, ptr, header_len); name_ptr = ptr; ptr += name_len + 1; header_len -= name_len + 1; hash = nxt_unit_field_hash(name_ptr, name_len); is_array = napi.is_array(value); if (is_array) { array_len = napi.get_array_length(value); for (j = 0; j < array_len; j++) { array_val = napi.get_element(value, j); val_type = napi.type_of(array_val); if (val_type != napi_string) { array_val = napi.coerce_to_string(array_val); } value_len = napi.get_value_string_latin1(array_val, ptr, header_len); f = req->response->fields + req->response->fields_count; f->skip = 0; nxt_unit_sptr_set(&f->name, name_ptr); f->name_length = name_len; f->hash = hash; nxt_unit_sptr_set(&f->value, ptr); f->value_length = (uint32_t) value_len; ptr += value_len + 1; header_len -= value_len + 1; req->response->fields_count++; } } else { val_type = napi.type_of(value); if (val_type != napi_string) { value = napi.coerce_to_string(value); } value_len = napi.get_value_string_latin1(value, ptr, header_len); f = req->response->fields + req->response->fields_count; f->skip = 0; nxt_unit_sptr_set(&f->name, name_ptr); f->name_length = name_len; f->hash = hash; nxt_unit_sptr_set(&f->value, ptr); f->value_length = (uint32_t) value_len; ptr += value_len + 1; header_len -= value_len + 1; req->response->fields_count++; } } } catch (exception &e) { napi.throw_error(e); return nullptr; } req->response_buf->free = ptr; ret = nxt_unit_response_send(req); if (ret != NXT_UNIT_OK) { napi.throw_error("Failed to send response"); return nullptr; } return this_arg; } napi_value Unit::response_write(napi_env env, napi_callback_info info) { int ret; void *ptr; size_t argc, have_buf_len; ssize_t res_len; uint32_t buf_start, buf_len; nxt_napi napi(env); napi_value this_arg; nxt_unit_buf_t *buf; napi_valuetype buf_type; nxt_unit_request_info_t *req; napi_value argv[3]; argc = 3; try { this_arg = napi.get_cb_info(info, argc, argv); if (argc != 3) { throw exception("Wrong args count. Expected: " "chunk, start, length"); } req = napi.get_request_info(this_arg); buf_type = napi.type_of(argv[0]); buf_start = napi.get_value_uint32(argv[1]); buf_len = napi.get_value_uint32(argv[2]) + 1; if (buf_type == napi_string) { /* TODO: will work only for utf8 content-type */ if (req->response_buf != NULL && req->response_buf->end >= req->response_buf->free + buf_len) { buf = req->response_buf; } else { buf = nxt_unit_response_buf_alloc(req, buf_len); if (buf == NULL) { throw exception("Failed to allocate response buffer"); } } have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, buf_len); buf->free += have_buf_len; ret = nxt_unit_buf_send(buf); if (ret == NXT_UNIT_OK) { res_len = have_buf_len; } } else { ptr = napi.get_buffer_info(argv[0], have_buf_len); if (buf_start > 0) { ptr = ((uint8_t *) ptr) + buf_start; have_buf_len -= buf_start; } res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; } if (ret != NXT_UNIT_OK) { throw exception("Failed to send body buf"); } } catch (exception &e) { napi.throw_error(e); return nullptr; } return napi.create((int64_t) res_len); } napi_value Unit::response_end(napi_env env, napi_callback_info info) { nxt_napi napi(env); napi_value this_arg; req_data_t *req_data; nxt_unit_request_info_t *req; try { this_arg = napi.get_cb_info(info); req = napi.get_request_info(this_arg); req_data = (req_data_t *) req->data; napi.remove_wrap(req_data->sock_ref); napi.remove_wrap(req_data->req_ref); napi.remove_wrap(req_data->resp_ref); napi.remove_wrap(req_data->conn_ref); } catch (exception &e) { napi.throw_error(e); return nullptr; } nxt_unit_request_done(req, NXT_UNIT_OK); return this_arg; } napi_value Unit::websocket_send_frame(napi_env env, napi_callback_info info) { int ret, iovec_len; bool fin; size_t buf_len; uint32_t opcode, sc; nxt_napi napi(env); napi_value this_arg, frame, payload; nxt_unit_request_info_t *req; char status_code[2]; struct iovec iov[2]; iovec_len = 0; try { this_arg = napi.get_cb_info(info, frame); req = napi.get_request_info(this_arg); opcode = napi.get_value_uint32(napi.get_named_property(frame, "opcode")); if (opcode == NXT_WEBSOCKET_OP_CLOSE) { sc = napi.get_value_uint32(napi.get_named_property(frame, "closeStatus")); status_code[0] = (sc >> 8) & 0xFF; status_code[1] = sc & 0xFF; iov[iovec_len].iov_base = status_code; iov[iovec_len].iov_len = 2; iovec_len++; } try { fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); } catch (exception &e) { fin = true; } payload = napi.get_named_property(frame, "binaryPayload"); if (napi.is_buffer(payload)) { iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); } else { buf_len = 0; } } catch (exception &e) { napi.throw_error(e); return nullptr; } if (buf_len > 0) { iov[iovec_len].iov_len = buf_len; iovec_len++; } ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); if (ret != NXT_UNIT_OK) { goto failed; } return this_arg; failed: napi.throw_error("Failed to send frame"); return nullptr; } napi_value Unit::websocket_set_sock(napi_env env, napi_callback_info info) { nxt_napi napi(env); napi_value this_arg, sock; req_data_t *req_data; nxt_unit_request_info_t *req; try { this_arg = napi.get_cb_info(info, sock); req = napi.get_request_info(sock); req_data = (req_data_t *) req->data; req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); } catch (exception &e) { napi.throw_error(e); return nullptr; } return this_arg; } void Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) { nxt_unit_req_debug(NULL, "conn_destroy: %p", r); } void Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) { nxt_unit_req_debug(NULL, "sock_destroy: %p", r); } void Unit::req_destroy(napi_env env, void *r, void *finalize_hint) { nxt_unit_req_debug(NULL, "req_destroy: %p", r); } void Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) { nxt_unit_req_debug(NULL, "resp_destroy: %p", r); }