diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-12-24 18:04:17 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-12-24 18:04:17 +0300 |
commit | 763bdff4018ec35de8383273d366160adebb6021 (patch) | |
tree | 69da5e8b14c436142a84717851edbd00d84b0575 /src/nodejs/unit-http/unit.cpp | |
parent | df7caf465072e171f88358b9e69c65b76d8efd25 (diff) | |
download | unit-763bdff4018ec35de8383273d366160adebb6021.tar.gz unit-763bdff4018ec35de8383273d366160adebb6021.tar.bz2 |
Node.js: implementing output message drain using SHM_ACK feature.
ServerResponse.write() method tries to write data buffer using libunit
and stores buffers to write in a Server-wide output queue, which is
processed in response to SHM_ACK message from router.
As a side effect 'drain' event implemented and socket.writable flag
reflect current state.
Diffstat (limited to '')
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 90 |
1 files changed, 74 insertions, 16 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 10875703..1fa73689 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -70,6 +70,8 @@ Unit::init(napi_env env, napi_value exports) websocket_send_frame); napi.set_named_property(exports, "websocket_set_sock", websocket_set_sock); + napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); + napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); } catch (exception &e) { napi.throw_error(e); @@ -148,6 +150,7 @@ Unit::create_server(napi_env env, napi_callback_info info) unit_init.callbacks.request_handler = request_handler_cb; unit_init.callbacks.websocket_handler = websocket_handler_cb; unit_init.callbacks.close_handler = close_handler_cb; + unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; unit_init.callbacks.add_port = add_port; unit_init.callbacks.remove_port = remove_port; unit_init.callbacks.quit = quit_cb; @@ -308,6 +311,40 @@ Unit::close_handler(nxt_unit_request_info_t *req) } +void +Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) +{ + Unit *obj; + + obj = reinterpret_cast<Unit *>(ctx->unit->data); + + obj->shm_ack_handler(ctx); +} + + +void +Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + napi_value server_obj, emit_drain; + + try { + nxt_handle_scope scope(env()); + + server_obj = get_server_object(); + + emit_drain = get_named_property(server_obj, "emit_drain"); + + nxt_async_context async_context(env(), "shm_ack_handler"); + nxt_callback_scope async_scope(async_context); + + make_callback(async_context, server_obj, emit_drain); + + } catch (exception &e) { + nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); + } +} + + static void nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { @@ -748,47 +785,68 @@ Unit::response_write(napi_env env, napi_callback_info info) int ret; void *ptr; size_t argc, have_buf_len; - uint32_t buf_len; + ssize_t res_len; + uint32_t buf_start, buf_len; nxt_napi napi(env); napi_value this_arg; nxt_unit_buf_t *buf; napi_valuetype buf_type; nxt_unit_request_info_t *req; - napi_value argv[2]; + napi_value argv[3]; - argc = 2; + argc = 3; try { this_arg = napi.get_cb_info(info, argc, argv); - if (argc != 2) { + if (argc != 3) { throw exception("Wrong args count. Expected: " - "chunk, chunk length"); + "chunk, start, length"); } req = napi.get_request_info(this_arg); buf_type = napi.type_of(argv[0]); - buf_len = napi.get_value_uint32(argv[1]) + 1; - - buf = nxt_unit_response_buf_alloc(req, buf_len); - if (buf == NULL) { - throw exception("Failed to allocate response buffer"); - } + buf_start = napi.get_value_uint32(argv[1]); + buf_len = napi.get_value_uint32(argv[2]) + 1; if (buf_type == napi_string) { /* TODO: will work only for utf8 content-type */ + if (req->response_buf != NULL + && (req->response_buf->end - req->response_buf->free) + >= buf_len) + { + buf = req->response_buf; + + } else { + buf = nxt_unit_response_buf_alloc(req, buf_len); + if (buf == NULL) { + throw exception("Failed to allocate response buffer"); + } + } + have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, buf_len); + buf->free += have_buf_len; + + ret = nxt_unit_buf_send(buf); + if (ret == NXT_UNIT_OK) { + res_len = have_buf_len; + } + } else { ptr = napi.get_buffer_info(argv[0], have_buf_len); - memcpy(buf->free, ptr, have_buf_len); - } + if (buf_start > 0) { + ptr = ((uint8_t *) ptr) + buf_start; + have_buf_len -= buf_start; + } - buf->free += have_buf_len; + res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); + + ret = res_len < 0 ? -res_len : NXT_UNIT_OK; + } - ret = nxt_unit_buf_send(buf); if (ret != NXT_UNIT_OK) { throw exception("Failed to send body buf"); } @@ -797,7 +855,7 @@ Unit::response_write(napi_env env, napi_callback_info info) return nullptr; } - return this_arg; + return napi.create((int64_t) res_len); } |