diff options
Diffstat (limited to '')
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 176 |
1 files changed, 133 insertions, 43 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 67b4377d..589eca3f 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -13,15 +13,29 @@ #include <nxt_unit_websocket.h> -static void delete_port_data(uv_handle_t* handle); - napi_ref Unit::constructor_; struct port_data_t { - nxt_unit_ctx_t *ctx; - nxt_unit_port_t *port; - uv_poll_t poll; + port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p); + + void process_port_msg(); + void stop(); + + template<typename T> + static port_data_t *get(T *handle); + + static void read_callback(uv_poll_t *handle, int status, int events); + static void timer_callback(uv_timer_t *handle); + static void delete_data(uv_handle_t* handle); + + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + uv_poll_t poll; + uv_timer_t timer; + int ref_count; + bool scheduled; + bool stopped; }; @@ -33,6 +47,106 @@ struct req_data_t { }; +port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) : + ctx(c), port(p), ref_count(0), scheduled(false), stopped(false) +{ + timer.type = UV_UNKNOWN_HANDLE; +} + + +void +port_data_t::process_port_msg() +{ + int rc, err; + + rc = nxt_unit_process_port_msg(ctx, port); + + if (rc != NXT_UNIT_OK) { + return; + } + + if (timer.type == UV_UNKNOWN_HANDLE) { + err = uv_timer_init(poll.loop, &timer); + if (err < 0) { + nxt_unit_warn(ctx, "Failed to init uv.poll"); + return; + } + + ref_count++; + timer.data = this; + } + + if (!scheduled && !stopped) { + uv_timer_start(&timer, timer_callback, 0, 0); + + scheduled = true; + } +} + + +void +port_data_t::stop() +{ + stopped = true; + + uv_poll_stop(&poll); + + uv_close((uv_handle_t *) &poll, delete_data); + + if (timer.type == UV_UNKNOWN_HANDLE) { + return; + } + + uv_timer_stop(&timer); + + uv_close((uv_handle_t *) &timer, delete_data); +} + + +template<typename T> +port_data_t * +port_data_t::get(T *handle) +{ + return (port_data_t *) handle->data; +} + + +void +port_data_t::read_callback(uv_poll_t *handle, int status, int events) +{ + get(handle)->process_port_msg(); +} + + +void +port_data_t::timer_callback(uv_timer_t *handle) +{ + port_data_t *data; + + data = get(handle); + + data->scheduled = false; + if (data->stopped) { + return; + } + + data->process_port_msg(); +} + + +void +port_data_t::delete_data(uv_handle_t* handle) +{ + port_data_t *data; + + data = get(handle); + + if (--data->ref_count <= 0) { + delete data; + } +} + + Unit::Unit(napi_env env, napi_value jsthis): nxt_napi(env), wrapper_(wrap(jsthis, this, destroy)), @@ -353,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) } -static void -nxt_uv_read_callback(uv_poll_t *handle, int status, int events) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - nxt_unit_process_port_msg(data->ctx, data->port); -} - - int Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int err; - Unit *obj; - uv_loop_t *loop; - port_data_t *data; - napi_status status; + int err; + Unit *obj; + uv_loop_t *loop; + port_data_t *data; + napi_status status; if (port->in_fd != -1) { - obj = reinterpret_cast<Unit *>(ctx->unit->data); - if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", port->in_fd, strerror(errno), errno); return -1; } + obj = reinterpret_cast<Unit *>(ctx->unit->data); + status = napi_get_uv_event_loop(obj->env(), &loop); if (status != napi_ok) { nxt_unit_warn(ctx, "Failed to get uv.loop"); return NXT_UNIT_ERROR; } - data = new port_data_t; + data = new port_data_t(ctx, port); err = uv_poll_init(loop, &data->poll, port->in_fd); if (err < 0) { nxt_unit_warn(ctx, "Failed to init uv.poll"); + delete data; return NXT_UNIT_ERROR; } - err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); + err = uv_poll_start(&data->poll, UV_READABLE, + port_data_t::read_callback); if (err < 0) { nxt_unit_warn(ctx, "Failed to start uv.poll"); + delete data; return NXT_UNIT_ERROR; } port->data = data; - data->ctx = ctx; - data->port = port; + data->ref_count++; data->poll.data = data; } @@ -421,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) if (port->data != NULL) { data = (port_data_t *) port->data; - if (data->port == port) { - uv_poll_stop(&data->poll); - - uv_close((uv_handle_t *) &data->poll, delete_port_data); - } + data->stop(); } } -static void -delete_port_data(uv_handle_t* handle) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - delete data; -} - - void Unit::quit_cb(nxt_unit_ctx_t *ctx) { |