summaryrefslogtreecommitdiffhomepage
path: root/src/nodejs/unit-http
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nodejs/unit-http/http_server.js50
-rw-r--r--src/nodejs/unit-http/unit.cpp271
-rw-r--r--src/nodejs/unit-http/unit.h6
3 files changed, 224 insertions, 103 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js
index d378e410..e59296ae 100644
--- a/src/nodejs/unit-http/http_server.js
+++ b/src/nodejs/unit-http/http_server.js
@@ -11,6 +11,7 @@ const util = require('util');
const unit_lib = require('./build/Release/unit-http');
const Socket = require('./socket');
const WebSocketFrame = require('./websocket_frame');
+const Readable = require('stream').Readable;
function ServerResponse(req) {
@@ -23,6 +24,7 @@ function ServerResponse(req) {
req._response = this;
this.socket = req.socket;
this.connection = req.connection;
+ this.writable = true;
}
util.inherits(ServerResponse, EventEmitter);
@@ -268,6 +270,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
res = this._write(chunk, 0, contentLength);
if (res < contentLength) {
this.socket.writable = false;
+ this.writable = false;
o = new BufferedOutput(this, res, chunk, encoding, callback);
this.server._output.push(o);
@@ -328,6 +331,8 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
if (typeof callback === 'function') {
callback();
}
+
+ this.emit("finish");
});
this.finished = true;
@@ -337,15 +342,14 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
};
function ServerRequest(server, socket) {
- EventEmitter.call(this);
+ Readable.call(this);
this.server = server;
this.socket = socket;
this.connection = socket;
+ this._pushed_eofchunk = false;
}
-util.inherits(ServerRequest, EventEmitter);
-
-ServerRequest.prototype.unpipe = undefined;
+util.inherits(ServerRequest, Readable);
ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) {
this.timeout = msecs;
@@ -377,35 +381,21 @@ ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() {
return http.STATUS_CODES;
};
-ServerRequest.prototype.listeners = function listeners() {
- return [];
-};
-
-ServerRequest.prototype.resume = function resume() {
- return [];
-};
+ServerRequest.prototype._request_read = unit_lib.request_read;
-/*
- * The "on" method is overridden to defer reading data until user code is
- * ready, that is (ev === "data"). This can occur after req.emit("end") is
- * executed, since the user code can be scheduled asynchronously by Promises
- * and so on. Passing the data is postponed by process.nextTick() until
- * the "on" method caller completes.
- */
-ServerRequest.prototype.on = function on(ev, fn) {
- Server.prototype.on.call(this, ev, fn);
+ServerRequest.prototype._read = function _read(n) {
+ const b = this._request_read(n);
- if (ev === "data") {
- process.nextTick(function () {
- if (this._data.length !== 0) {
- this.emit("data", this._data);
- }
+ if (b != null) {
+ this.push(b);
+ }
- }.bind(this));
+ if (!this._pushed_eofchunk && (b == null || b.length < n)) {
+ this._pushed_eofchunk = true;
+ this.push(null);
}
};
-ServerRequest.prototype.addListener = ServerRequest.prototype.on;
function Server(requestListener) {
EventEmitter.call(this);
@@ -472,11 +462,6 @@ Server.prototype.emit_request = function (req, res) {
} else {
this.emit("request", req, res);
}
-
- process.nextTick(() => {
- req.emit("finish");
- req.emit("end");
- });
};
Server.prototype.emit_close = function () {
@@ -523,6 +508,7 @@ Server.prototype.emit_drain = function () {
}
resp.socket.writable = true;
+ resp.writable = true;
process.nextTick(() => {
resp.emit("drain");
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index c5bca49a..589eca3f 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -13,25 +13,140 @@
#include <nxt_unit_websocket.h>
-static void delete_port_data(uv_handle_t* handle);
-
napi_ref Unit::constructor_;
struct port_data_t {
- nxt_unit_ctx_t *ctx;
- nxt_unit_port_t *port;
- uv_poll_t poll;
+ port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
+
+ void process_port_msg();
+ void stop();
+
+ template<typename T>
+ static port_data_t *get(T *handle);
+
+ static void read_callback(uv_poll_t *handle, int status, int events);
+ static void timer_callback(uv_timer_t *handle);
+ static void delete_data(uv_handle_t* handle);
+
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_port_t *port;
+ uv_poll_t poll;
+ uv_timer_t timer;
+ int ref_count;
+ bool scheduled;
+ bool stopped;
};
struct req_data_t {
napi_ref sock_ref;
+ napi_ref req_ref;
napi_ref resp_ref;
napi_ref conn_ref;
};
+port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
+ ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
+{
+ timer.type = UV_UNKNOWN_HANDLE;
+}
+
+
+void
+port_data_t::process_port_msg()
+{
+ int rc, err;
+
+ rc = nxt_unit_process_port_msg(ctx, port);
+
+ if (rc != NXT_UNIT_OK) {
+ return;
+ }
+
+ if (timer.type == UV_UNKNOWN_HANDLE) {
+ err = uv_timer_init(poll.loop, &timer);
+ if (err < 0) {
+ nxt_unit_warn(ctx, "Failed to init uv.poll");
+ return;
+ }
+
+ ref_count++;
+ timer.data = this;
+ }
+
+ if (!scheduled && !stopped) {
+ uv_timer_start(&timer, timer_callback, 0, 0);
+
+ scheduled = true;
+ }
+}
+
+
+void
+port_data_t::stop()
+{
+ stopped = true;
+
+ uv_poll_stop(&poll);
+
+ uv_close((uv_handle_t *) &poll, delete_data);
+
+ if (timer.type == UV_UNKNOWN_HANDLE) {
+ return;
+ }
+
+ uv_timer_stop(&timer);
+
+ uv_close((uv_handle_t *) &timer, delete_data);
+}
+
+
+template<typename T>
+port_data_t *
+port_data_t::get(T *handle)
+{
+ return (port_data_t *) handle->data;
+}
+
+
+void
+port_data_t::read_callback(uv_poll_t *handle, int status, int events)
+{
+ get(handle)->process_port_msg();
+}
+
+
+void
+port_data_t::timer_callback(uv_timer_t *handle)
+{
+ port_data_t *data;
+
+ data = get(handle);
+
+ data->scheduled = false;
+ if (data->stopped) {
+ return;
+ }
+
+ data->process_port_msg();
+}
+
+
+void
+port_data_t::delete_data(uv_handle_t* handle)
+{
+ port_data_t *data;
+
+ data = get(handle);
+
+ if (--data->ref_count <= 0) {
+ delete data;
+ }
+}
+
+
Unit::Unit(napi_env env, napi_value jsthis):
nxt_napi(env),
wrapper_(wrap(jsthis, this, destroy)),
@@ -65,6 +180,7 @@ Unit::init(napi_env env, napi_value exports)
constructor_ = napi.create_reference(ctor);
napi.set_named_property(exports, "Unit", ctor);
+ napi.set_named_property(exports, "request_read", request_read);
napi.set_named_property(exports, "response_send_headers",
response_send_headers);
napi.set_named_property(exports, "response_write", response_write);
@@ -206,7 +322,7 @@ Unit::request_handler(nxt_unit_request_info_t *req)
server_obj = get_server_object();
socket = create_socket(server_obj, req);
- request = create_request(server_obj, socket);
+ request = create_request(server_obj, socket, req);
response = create_response(server_obj, request, req);
create_headers(req, request);
@@ -301,6 +417,7 @@ Unit::close_handler(nxt_unit_request_info_t *req)
nxt_napi::create(0));
remove_wrap(req_data->sock_ref);
+ remove_wrap(req_data->req_ref);
remove_wrap(req_data->resp_ref);
remove_wrap(req_data->conn_ref);
@@ -350,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
}
-static void
-nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
-{
- port_data_t *data;
-
- data = (port_data_t *) handle->data;
-
- nxt_unit_process_port_msg(data->ctx, data->port);
-}
-
-
int
Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int err;
- Unit *obj;
- uv_loop_t *loop;
- port_data_t *data;
- napi_status status;
+ int err;
+ Unit *obj;
+ uv_loop_t *loop;
+ port_data_t *data;
+ napi_status status;
if (port->in_fd != -1) {
- obj = reinterpret_cast<Unit *>(ctx->unit->data);
-
if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
port->in_fd, strerror(errno), errno);
return -1;
}
+ obj = reinterpret_cast<Unit *>(ctx->unit->data);
+
status = napi_get_uv_event_loop(obj->env(), &loop);
if (status != napi_ok) {
nxt_unit_warn(ctx, "Failed to get uv.loop");
return NXT_UNIT_ERROR;
}
- data = new port_data_t;
+ data = new port_data_t(ctx, port);
err = uv_poll_init(loop, &data->poll, port->in_fd);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to init uv.poll");
+ delete data;
return NXT_UNIT_ERROR;
}
- err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
+ err = uv_poll_start(&data->poll, UV_READABLE,
+ port_data_t::read_callback);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to start uv.poll");
+ delete data;
return NXT_UNIT_ERROR;
}
port->data = data;
- data->ctx = ctx;
- data->port = port;
+ data->ref_count++;
data->poll.data = data;
}
@@ -418,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
if (port->data != NULL) {
data = (port_data_t *) port->data;
- if (data->port == port) {
- uv_poll_stop(&data->poll);
-
- uv_close((uv_handle_t *) &data->poll, delete_port_data);
- }
+ data->stop();
}
}
-static void
-delete_port_data(uv_handle_t* handle)
-{
- port_data_t *data;
-
- data = (port_data_t *) handle->data;
-
- delete data;
-}
-
-
void
Unit::quit_cb(nxt_unit_ctx_t *ctx)
{
@@ -488,9 +581,8 @@ Unit::get_server_object()
void
Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
{
- void *data;
uint32_t i;
- napi_value headers, raw_headers, buffer;
+ napi_value headers, raw_headers;
napi_status status;
nxt_unit_request_t *r;
@@ -515,11 +607,6 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
set_named_property(request, "url", r->target, r->target_length);
set_named_property(request, "_websocket_handshake", r->websocket_handshake);
-
- buffer = create_buffer((size_t) req->content_length, &data);
- nxt_unit_request_read(req, data, req->content_length);
-
- set_named_property(request, "_data", buffer);
}
@@ -577,13 +664,20 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
napi_value
-Unit::create_request(napi_value server_obj, napi_value socket)
+Unit::create_request(napi_value server_obj, napi_value socket,
+ nxt_unit_request_info_t *req)
{
- napi_value constructor;
+ napi_value constructor, res;
+ req_data_t *req_data;
constructor = get_named_property(server_obj, "ServerRequest");
- return new_instance(constructor, server_obj, socket);
+ res = new_instance(constructor, server_obj, socket);
+
+ req_data = (req_data_t *) req->data;
+ req_data->req_ref = wrap(res, req, req_destroy);
+
+ return res;
}
@@ -643,6 +737,47 @@ Unit::create_websocket_frame(napi_value server_obj,
napi_value
+Unit::request_read(napi_env env, napi_callback_info info)
+{
+ void *data;
+ uint32_t wm;
+ nxt_napi napi(env);
+ napi_value this_arg, argv, buffer;
+ nxt_unit_request_info_t *req;
+
+ try {
+ this_arg = napi.get_cb_info(info, argv);
+
+ try {
+ req = napi.get_request_info(this_arg);
+
+ } catch (exception &e) {
+ return nullptr;
+ }
+
+ if (req->content_length == 0) {
+ return nullptr;
+ }
+
+ wm = napi.get_value_uint32(argv);
+
+ if (wm > req->content_length) {
+ wm = req->content_length;
+ }
+
+ buffer = napi.create_buffer((size_t) wm, &data);
+ nxt_unit_request_read(req, data, wm);
+
+ } catch (exception &e) {
+ napi.throw_error(e);
+ return nullptr;
+ }
+
+ return buffer;
+}
+
+
+napi_value
Unit::response_send_headers(napi_env env, napi_callback_info info)
{
int ret;
@@ -884,6 +1019,7 @@ Unit::response_end(napi_env env, napi_callback_info info)
req_data = (req_data_t *) req->data;
napi.remove_wrap(req_data->sock_ref);
+ napi.remove_wrap(req_data->req_ref);
napi.remove_wrap(req_data->resp_ref);
napi.remove_wrap(req_data->conn_ref);
@@ -998,33 +1134,28 @@ Unit::websocket_set_sock(napi_env env, napi_callback_info info)
void
-Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
{
- nxt_unit_request_info_t *req;
-
- req = (nxt_unit_request_info_t *) nativeObject;
-
- nxt_unit_warn(NULL, "conn_destroy: %p", req);
+ nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
}
void
-Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
{
- nxt_unit_request_info_t *req;
-
- req = (nxt_unit_request_info_t *) nativeObject;
-
- nxt_unit_warn(NULL, "sock_destroy: %p", req);
+ nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
}
void
-Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
{
- nxt_unit_request_info_t *req;
+ nxt_unit_req_debug(NULL, "req_destroy: %p", r);
+}
- req = (nxt_unit_request_info_t *) nativeObject;
- nxt_unit_warn(NULL, "resp_destroy: %p", req);
+void
+Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
+{
+ nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
}
diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h
index 07823c26..4ef40d45 100644
--- a/src/nodejs/unit-http/unit.h
+++ b/src/nodejs/unit-http/unit.h
@@ -21,6 +21,7 @@ private:
static void destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint);
+ static void req_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static napi_value create_server(napi_env env, napi_callback_info info);
@@ -50,7 +51,8 @@ private:
napi_value create_socket(napi_value server_obj,
nxt_unit_request_info_t *req);
- napi_value create_request(napi_value server_obj, napi_value socket);
+ napi_value create_request(napi_value server_obj, napi_value socket,
+ nxt_unit_request_info_t *req);
napi_value create_response(napi_value server_obj, napi_value request,
nxt_unit_request_info_t *req);
@@ -58,6 +60,8 @@ private:
napi_value create_websocket_frame(napi_value server_obj,
nxt_unit_websocket_frame_t *ws);
+ static napi_value request_read(napi_env env, napi_callback_info info);
+
static napi_value response_send_headers(napi_env env,
napi_callback_info info);