diff options
Diffstat (limited to '')
-rw-r--r-- | src/nodejs/unit-http/http_server.js | 50 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 271 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.h | 6 |
3 files changed, 224 insertions, 103 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index d378e410..e59296ae 100644 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -11,6 +11,7 @@ const util = require('util'); const unit_lib = require('./build/Release/unit-http'); const Socket = require('./socket'); const WebSocketFrame = require('./websocket_frame'); +const Readable = require('stream').Readable; function ServerResponse(req) { @@ -23,6 +24,7 @@ function ServerResponse(req) { req._response = this; this.socket = req.socket; this.connection = req.connection; + this.writable = true; } util.inherits(ServerResponse, EventEmitter); @@ -268,6 +270,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { res = this._write(chunk, 0, contentLength); if (res < contentLength) { this.socket.writable = false; + this.writable = false; o = new BufferedOutput(this, res, chunk, encoding, callback); this.server._output.push(o); @@ -328,6 +331,8 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) { if (typeof callback === 'function') { callback(); } + + this.emit("finish"); }); this.finished = true; @@ -337,15 +342,14 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) { }; function ServerRequest(server, socket) { - EventEmitter.call(this); + Readable.call(this); this.server = server; this.socket = socket; this.connection = socket; + this._pushed_eofchunk = false; } -util.inherits(ServerRequest, EventEmitter); - -ServerRequest.prototype.unpipe = undefined; +util.inherits(ServerRequest, Readable); ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) { this.timeout = msecs; @@ -377,35 +381,21 @@ ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() { return http.STATUS_CODES; }; -ServerRequest.prototype.listeners = function listeners() { - return []; -}; - -ServerRequest.prototype.resume = function resume() { - return []; -}; +ServerRequest.prototype._request_read = unit_lib.request_read; -/* - * The "on" method is overridden to defer reading data until user code is - * ready, that is (ev === "data"). This can occur after req.emit("end") is - * executed, since the user code can be scheduled asynchronously by Promises - * and so on. Passing the data is postponed by process.nextTick() until - * the "on" method caller completes. - */ -ServerRequest.prototype.on = function on(ev, fn) { - Server.prototype.on.call(this, ev, fn); +ServerRequest.prototype._read = function _read(n) { + const b = this._request_read(n); - if (ev === "data") { - process.nextTick(function () { - if (this._data.length !== 0) { - this.emit("data", this._data); - } + if (b != null) { + this.push(b); + } - }.bind(this)); + if (!this._pushed_eofchunk && (b == null || b.length < n)) { + this._pushed_eofchunk = true; + this.push(null); } }; -ServerRequest.prototype.addListener = ServerRequest.prototype.on; function Server(requestListener) { EventEmitter.call(this); @@ -472,11 +462,6 @@ Server.prototype.emit_request = function (req, res) { } else { this.emit("request", req, res); } - - process.nextTick(() => { - req.emit("finish"); - req.emit("end"); - }); }; Server.prototype.emit_close = function () { @@ -523,6 +508,7 @@ Server.prototype.emit_drain = function () { } resp.socket.writable = true; + resp.writable = true; process.nextTick(() => { resp.emit("drain"); diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index c5bca49a..589eca3f 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -13,25 +13,140 @@ #include <nxt_unit_websocket.h> -static void delete_port_data(uv_handle_t* handle); - napi_ref Unit::constructor_; struct port_data_t { - nxt_unit_ctx_t *ctx; - nxt_unit_port_t *port; - uv_poll_t poll; + port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p); + + void process_port_msg(); + void stop(); + + template<typename T> + static port_data_t *get(T *handle); + + static void read_callback(uv_poll_t *handle, int status, int events); + static void timer_callback(uv_timer_t *handle); + static void delete_data(uv_handle_t* handle); + + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + uv_poll_t poll; + uv_timer_t timer; + int ref_count; + bool scheduled; + bool stopped; }; struct req_data_t { napi_ref sock_ref; + napi_ref req_ref; napi_ref resp_ref; napi_ref conn_ref; }; +port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) : + ctx(c), port(p), ref_count(0), scheduled(false), stopped(false) +{ + timer.type = UV_UNKNOWN_HANDLE; +} + + +void +port_data_t::process_port_msg() +{ + int rc, err; + + rc = nxt_unit_process_port_msg(ctx, port); + + if (rc != NXT_UNIT_OK) { + return; + } + + if (timer.type == UV_UNKNOWN_HANDLE) { + err = uv_timer_init(poll.loop, &timer); + if (err < 0) { + nxt_unit_warn(ctx, "Failed to init uv.poll"); + return; + } + + ref_count++; + timer.data = this; + } + + if (!scheduled && !stopped) { + uv_timer_start(&timer, timer_callback, 0, 0); + + scheduled = true; + } +} + + +void +port_data_t::stop() +{ + stopped = true; + + uv_poll_stop(&poll); + + uv_close((uv_handle_t *) &poll, delete_data); + + if (timer.type == UV_UNKNOWN_HANDLE) { + return; + } + + uv_timer_stop(&timer); + + uv_close((uv_handle_t *) &timer, delete_data); +} + + +template<typename T> +port_data_t * +port_data_t::get(T *handle) +{ + return (port_data_t *) handle->data; +} + + +void +port_data_t::read_callback(uv_poll_t *handle, int status, int events) +{ + get(handle)->process_port_msg(); +} + + +void +port_data_t::timer_callback(uv_timer_t *handle) +{ + port_data_t *data; + + data = get(handle); + + data->scheduled = false; + if (data->stopped) { + return; + } + + data->process_port_msg(); +} + + +void +port_data_t::delete_data(uv_handle_t* handle) +{ + port_data_t *data; + + data = get(handle); + + if (--data->ref_count <= 0) { + delete data; + } +} + + Unit::Unit(napi_env env, napi_value jsthis): nxt_napi(env), wrapper_(wrap(jsthis, this, destroy)), @@ -65,6 +180,7 @@ Unit::init(napi_env env, napi_value exports) constructor_ = napi.create_reference(ctor); napi.set_named_property(exports, "Unit", ctor); + napi.set_named_property(exports, "request_read", request_read); napi.set_named_property(exports, "response_send_headers", response_send_headers); napi.set_named_property(exports, "response_write", response_write); @@ -206,7 +322,7 @@ Unit::request_handler(nxt_unit_request_info_t *req) server_obj = get_server_object(); socket = create_socket(server_obj, req); - request = create_request(server_obj, socket); + request = create_request(server_obj, socket, req); response = create_response(server_obj, request, req); create_headers(req, request); @@ -301,6 +417,7 @@ Unit::close_handler(nxt_unit_request_info_t *req) nxt_napi::create(0)); remove_wrap(req_data->sock_ref); + remove_wrap(req_data->req_ref); remove_wrap(req_data->resp_ref); remove_wrap(req_data->conn_ref); @@ -350,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) } -static void -nxt_uv_read_callback(uv_poll_t *handle, int status, int events) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - nxt_unit_process_port_msg(data->ctx, data->port); -} - - int Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int err; - Unit *obj; - uv_loop_t *loop; - port_data_t *data; - napi_status status; + int err; + Unit *obj; + uv_loop_t *loop; + port_data_t *data; + napi_status status; if (port->in_fd != -1) { - obj = reinterpret_cast<Unit *>(ctx->unit->data); - if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", port->in_fd, strerror(errno), errno); return -1; } + obj = reinterpret_cast<Unit *>(ctx->unit->data); + status = napi_get_uv_event_loop(obj->env(), &loop); if (status != napi_ok) { nxt_unit_warn(ctx, "Failed to get uv.loop"); return NXT_UNIT_ERROR; } - data = new port_data_t; + data = new port_data_t(ctx, port); err = uv_poll_init(loop, &data->poll, port->in_fd); if (err < 0) { nxt_unit_warn(ctx, "Failed to init uv.poll"); + delete data; return NXT_UNIT_ERROR; } - err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); + err = uv_poll_start(&data->poll, UV_READABLE, + port_data_t::read_callback); if (err < 0) { nxt_unit_warn(ctx, "Failed to start uv.poll"); + delete data; return NXT_UNIT_ERROR; } port->data = data; - data->ctx = ctx; - data->port = port; + data->ref_count++; data->poll.data = data; } @@ -418,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) if (port->data != NULL) { data = (port_data_t *) port->data; - if (data->port == port) { - uv_poll_stop(&data->poll); - - uv_close((uv_handle_t *) &data->poll, delete_port_data); - } + data->stop(); } } -static void -delete_port_data(uv_handle_t* handle) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - delete data; -} - - void Unit::quit_cb(nxt_unit_ctx_t *ctx) { @@ -488,9 +581,8 @@ Unit::get_server_object() void Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) { - void *data; uint32_t i; - napi_value headers, raw_headers, buffer; + napi_value headers, raw_headers; napi_status status; nxt_unit_request_t *r; @@ -515,11 +607,6 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) set_named_property(request, "url", r->target, r->target_length); set_named_property(request, "_websocket_handshake", r->websocket_handshake); - - buffer = create_buffer((size_t) req->content_length, &data); - nxt_unit_request_read(req, data, req->content_length); - - set_named_property(request, "_data", buffer); } @@ -577,13 +664,20 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) napi_value -Unit::create_request(napi_value server_obj, napi_value socket) +Unit::create_request(napi_value server_obj, napi_value socket, + nxt_unit_request_info_t *req) { - napi_value constructor; + napi_value constructor, res; + req_data_t *req_data; constructor = get_named_property(server_obj, "ServerRequest"); - return new_instance(constructor, server_obj, socket); + res = new_instance(constructor, server_obj, socket); + + req_data = (req_data_t *) req->data; + req_data->req_ref = wrap(res, req, req_destroy); + + return res; } @@ -643,6 +737,47 @@ Unit::create_websocket_frame(napi_value server_obj, napi_value +Unit::request_read(napi_env env, napi_callback_info info) +{ + void *data; + uint32_t wm; + nxt_napi napi(env); + napi_value this_arg, argv, buffer; + nxt_unit_request_info_t *req; + + try { + this_arg = napi.get_cb_info(info, argv); + + try { + req = napi.get_request_info(this_arg); + + } catch (exception &e) { + return nullptr; + } + + if (req->content_length == 0) { + return nullptr; + } + + wm = napi.get_value_uint32(argv); + + if (wm > req->content_length) { + wm = req->content_length; + } + + buffer = napi.create_buffer((size_t) wm, &data); + nxt_unit_request_read(req, data, wm); + + } catch (exception &e) { + napi.throw_error(e); + return nullptr; + } + + return buffer; +} + + +napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { int ret; @@ -884,6 +1019,7 @@ Unit::response_end(napi_env env, napi_callback_info info) req_data = (req_data_t *) req->data; napi.remove_wrap(req_data->sock_ref); + napi.remove_wrap(req_data->req_ref); napi.remove_wrap(req_data->resp_ref); napi.remove_wrap(req_data->conn_ref); @@ -998,33 +1134,28 @@ Unit::websocket_set_sock(napi_env env, napi_callback_info info) void -Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; - - req = (nxt_unit_request_info_t *) nativeObject; - - nxt_unit_warn(NULL, "conn_destroy: %p", req); + nxt_unit_req_debug(NULL, "conn_destroy: %p", r); } void -Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; - - req = (nxt_unit_request_info_t *) nativeObject; - - nxt_unit_warn(NULL, "sock_destroy: %p", req); + nxt_unit_req_debug(NULL, "sock_destroy: %p", r); } void -Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::req_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; + nxt_unit_req_debug(NULL, "req_destroy: %p", r); +} - req = (nxt_unit_request_info_t *) nativeObject; - nxt_unit_warn(NULL, "resp_destroy: %p", req); +void +Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) +{ + nxt_unit_req_debug(NULL, "resp_destroy: %p", r); } diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 07823c26..4ef40d45 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -21,6 +21,7 @@ private: static void destroy(napi_env env, void *nativeObject, void *finalize_hint); static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint); static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint); + static void req_destroy(napi_env env, void *nativeObject, void *finalize_hint); static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint); static napi_value create_server(napi_env env, napi_callback_info info); @@ -50,7 +51,8 @@ private: napi_value create_socket(napi_value server_obj, nxt_unit_request_info_t *req); - napi_value create_request(napi_value server_obj, napi_value socket); + napi_value create_request(napi_value server_obj, napi_value socket, + nxt_unit_request_info_t *req); napi_value create_response(napi_value server_obj, napi_value request, nxt_unit_request_info_t *req); @@ -58,6 +60,8 @@ private: napi_value create_websocket_frame(napi_value server_obj, nxt_unit_websocket_frame_t *ws); + static napi_value request_read(napi_env env, napi_callback_info info); + static napi_value response_send_headers(napi_env env, napi_callback_info info); |