summaryrefslogtreecommitdiffhomepage
path: root/src/nodejs
diff options
context:
space:
mode:
Diffstat (limited to 'src/nodejs')
-rw-r--r--src/nodejs/unit-http/unit.cpp176
1 files changed, 133 insertions, 43 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 67b4377d..589eca3f 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -13,15 +13,29 @@
#include <nxt_unit_websocket.h>
-static void delete_port_data(uv_handle_t* handle);
-
napi_ref Unit::constructor_;
struct port_data_t {
- nxt_unit_ctx_t *ctx;
- nxt_unit_port_t *port;
- uv_poll_t poll;
+ port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
+
+ void process_port_msg();
+ void stop();
+
+ template<typename T>
+ static port_data_t *get(T *handle);
+
+ static void read_callback(uv_poll_t *handle, int status, int events);
+ static void timer_callback(uv_timer_t *handle);
+ static void delete_data(uv_handle_t* handle);
+
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_port_t *port;
+ uv_poll_t poll;
+ uv_timer_t timer;
+ int ref_count;
+ bool scheduled;
+ bool stopped;
};
@@ -33,6 +47,106 @@ struct req_data_t {
};
+port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
+ ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
+{
+ timer.type = UV_UNKNOWN_HANDLE;
+}
+
+
+void
+port_data_t::process_port_msg()
+{
+ int rc, err;
+
+ rc = nxt_unit_process_port_msg(ctx, port);
+
+ if (rc != NXT_UNIT_OK) {
+ return;
+ }
+
+ if (timer.type == UV_UNKNOWN_HANDLE) {
+ err = uv_timer_init(poll.loop, &timer);
+ if (err < 0) {
+ nxt_unit_warn(ctx, "Failed to init uv.poll");
+ return;
+ }
+
+ ref_count++;
+ timer.data = this;
+ }
+
+ if (!scheduled && !stopped) {
+ uv_timer_start(&timer, timer_callback, 0, 0);
+
+ scheduled = true;
+ }
+}
+
+
+void
+port_data_t::stop()
+{
+ stopped = true;
+
+ uv_poll_stop(&poll);
+
+ uv_close((uv_handle_t *) &poll, delete_data);
+
+ if (timer.type == UV_UNKNOWN_HANDLE) {
+ return;
+ }
+
+ uv_timer_stop(&timer);
+
+ uv_close((uv_handle_t *) &timer, delete_data);
+}
+
+
+template<typename T>
+port_data_t *
+port_data_t::get(T *handle)
+{
+ return (port_data_t *) handle->data;
+}
+
+
+void
+port_data_t::read_callback(uv_poll_t *handle, int status, int events)
+{
+ get(handle)->process_port_msg();
+}
+
+
+void
+port_data_t::timer_callback(uv_timer_t *handle)
+{
+ port_data_t *data;
+
+ data = get(handle);
+
+ data->scheduled = false;
+ if (data->stopped) {
+ return;
+ }
+
+ data->process_port_msg();
+}
+
+
+void
+port_data_t::delete_data(uv_handle_t* handle)
+{
+ port_data_t *data;
+
+ data = get(handle);
+
+ if (--data->ref_count <= 0) {
+ delete data;
+ }
+}
+
+
Unit::Unit(napi_env env, napi_value jsthis):
nxt_napi(env),
wrapper_(wrap(jsthis, this, destroy)),
@@ -353,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
}
-static void
-nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
-{
- port_data_t *data;
-
- data = (port_data_t *) handle->data;
-
- nxt_unit_process_port_msg(data->ctx, data->port);
-}
-
-
int
Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int err;
- Unit *obj;
- uv_loop_t *loop;
- port_data_t *data;
- napi_status status;
+ int err;
+ Unit *obj;
+ uv_loop_t *loop;
+ port_data_t *data;
+ napi_status status;
if (port->in_fd != -1) {
- obj = reinterpret_cast<Unit *>(ctx->unit->data);
-
if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
port->in_fd, strerror(errno), errno);
return -1;
}
+ obj = reinterpret_cast<Unit *>(ctx->unit->data);
+
status = napi_get_uv_event_loop(obj->env(), &loop);
if (status != napi_ok) {
nxt_unit_warn(ctx, "Failed to get uv.loop");
return NXT_UNIT_ERROR;
}
- data = new port_data_t;
+ data = new port_data_t(ctx, port);
err = uv_poll_init(loop, &data->poll, port->in_fd);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to init uv.poll");
+ delete data;
return NXT_UNIT_ERROR;
}
- err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
+ err = uv_poll_start(&data->poll, UV_READABLE,
+ port_data_t::read_callback);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to start uv.poll");
+ delete data;
return NXT_UNIT_ERROR;
}
port->data = data;
- data->ctx = ctx;
- data->port = port;
+ data->ref_count++;
data->poll.data = data;
}
@@ -421,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
if (port->data != NULL) {
data = (port_data_t *) port->data;
- if (data->port == port) {
- uv_poll_stop(&data->poll);
-
- uv_close((uv_handle_t *) &data->poll, delete_port_data);
- }
+ data->stop();
}
}
-static void
-delete_port_data(uv_handle_t* handle)
-{
- port_data_t *data;
-
- data = (port_data_t *) handle->data;
-
- delete data;
-}
-
-
void
Unit::quit_cb(nxt_unit_ctx_t *ctx)
{