diff options
Diffstat (limited to 'src')
36 files changed, 1792 insertions, 253 deletions
diff --git a/src/go/unit/nxt_cgo_lib.c b/src/go/unit/nxt_cgo_lib.c index 9c080730..172bef88 100644 --- a/src/go/unit/nxt_cgo_lib.c +++ b/src/go/unit/nxt_cgo_lib.c @@ -203,5 +203,5 @@ nxt_cgo_request_done(uintptr_t req, int res) void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) { - nxt_unit_warn(NULL, ".*s", (int) msg_len, (char *) msg); + nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg); } diff --git a/src/go/unit/port.go b/src/go/unit/port.go index f716c9ec..a68cae74 100644 --- a/src/go/unit/port.go +++ b/src/go/unit/port.go @@ -163,7 +163,7 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, GoBytes(oob, oob_size)) if err != nil { - nxt_go_warn("write result %d (%d), %s", n, oobn, err) + nxt_go_warn("read result %d (%d), %s", n, oobn, err) } return C.ssize_t(n) diff --git a/src/nodejs/unit-http/README.md b/src/nodejs/unit-http/README.md new file mode 100644 index 00000000..71a4067a --- /dev/null +++ b/src/nodejs/unit-http/README.md @@ -0,0 +1,21 @@ +# Node.js Package for NGINX Unit + +[<img src="https://unit.nginx.org/_static/logo.svg" width=150px>](https://unit.nginx.org) +Node.js support package for NGINX Unit. +For details, see [NGINX Unit documentation](https://unit.nginx.org). + +## Installation + +```bash +npm i unit-http +``` + +## Usage + +```javascript +var http = require('unit-http'); +``` + +## License + +Apache 2.0 diff --git a/src/nodejs/unit-http/addon.cpp b/src/nodejs/unit-http/addon.cpp new file mode 100644 index 00000000..6ced9538 --- /dev/null +++ b/src/nodejs/unit-http/addon.cpp @@ -0,0 +1,15 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include "unit.h" + + +napi_value +Init(napi_env env, napi_value exports) +{ + return Unit::init(env, exports); +} + +NAPI_MODULE(Unit, Init) diff --git a/src/nodejs/unit-http/binding.gyp b/src/nodejs/unit-http/binding.gyp new file mode 100644 index 00000000..171c2eb7 --- /dev/null +++ b/src/nodejs/unit-http/binding.gyp @@ -0,0 +1,12 @@ +{ + 'targets': [{ + 'target_name': "unit-http", + 'sources': ["unit.cpp", "addon.cpp"], + 'include_dirs': [ + "<!(echo $UNIT_SRC_PATH)" + ], + 'libraries': [ + "<!(echo $UNIT_LIB_STATIC_PATH)" + ] + }] +} diff --git a/src/nodejs/unit-http/binding_pub.gyp b/src/nodejs/unit-http/binding_pub.gyp new file mode 100644 index 00000000..6fe3d9bc --- /dev/null +++ b/src/nodejs/unit-http/binding_pub.gyp @@ -0,0 +1,7 @@ +{ + 'targets': [{ + 'target_name': "unit-http", + 'sources': ["unit.cpp", "addon.cpp"], + 'libraries': ["-lunit"] + }] +} diff --git a/src/nodejs/unit-http/http.js b/src/nodejs/unit-http/http.js new file mode 100755 index 00000000..3a25fa2f --- /dev/null +++ b/src/nodejs/unit-http/http.js @@ -0,0 +1,23 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +'use strict'; + +const server = require('unit-http/http_server'); + +const { Server } = server; + +function createServer (requestHandler) { + return new Server(requestHandler); +} + + +module.exports = { + Server, + STATUS_CODES: server.STATUS_CODES, + createServer, + IncomingMessage: server.ServerRequest, + ServerResponse: server.ServerResponse +}; diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js new file mode 100755 index 00000000..fa7b8e9b --- /dev/null +++ b/src/nodejs/unit-http/http_server.js @@ -0,0 +1,331 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +'use strict'; + +const EventEmitter = require('events'); +const http = require('http'); +const util = require('util'); +const unit_lib = require('unit-http/build/Release/unit-http.node'); +const unit_socket = require('unit-http/socket'); + +const { Socket } = unit_socket; + + +function ServerResponse(req) { + EventEmitter.call(this); + + this.headers = {}; +} +util.inherits(ServerResponse, EventEmitter); + +ServerResponse.prototype.statusCode = 200; +ServerResponse.prototype.statusMessage = undefined; +ServerResponse.prototype.headers_len = 0; +ServerResponse.prototype.headers_count = 0; +ServerResponse.prototype.headersSent = false; +ServerResponse.prototype.finished = false; + +ServerResponse.prototype._finish = function _finish() { + this.headers = {}; + this.headers_len = 0; + this.headers_count = 0; + this.finished = true; +}; + +ServerResponse.prototype.assignSocket = function assignSocket(socket) { +}; + +ServerResponse.prototype.detachSocket = function detachSocket(socket) { +}; + +ServerResponse.prototype.writeContinue = function writeContinue(cb) { +}; + +ServerResponse.prototype.writeProcessing = function writeProcessing(cb) { +}; + +ServerResponse.prototype.setHeader = function setHeader(key, value) { + if (typeof key !== 'string') { + throw new TypeError('Key argument must be a string'); + } + + let header_key_len = Buffer.byteLength(key, 'latin1'); + let header_len = 0 + let header_count = 0; + + if (Array.isArray(value)) { + header_count = value.length; + + value.forEach(function(val) { + if (typeof val !== 'string' && typeof val !== 'number') { + throw new TypeError('Array entries must be string or number'); + } + + header_len += Buffer.byteLength(val + "", 'latin1'); + }); + + } else { + if (typeof value !== 'string' && typeof value !== 'number') { + throw new TypeError('Value argument must be string, number, or array'); + } + + header_count = 1; + header_len = Buffer.byteLength(value + "", 'latin1'); + } + + this.removeHeader(key); + + this.headers[key] = value + ""; + this.headers_len += header_len + (header_key_len * header_count); + this.headers_count += header_count; +}; + +ServerResponse.prototype.getHeader = function getHeader(name) { + return this.headers[name]; +}; + +ServerResponse.prototype.getHeaderNames = function getHeaderNames() { + return Object.keys(this.headers); +}; + +ServerResponse.prototype.getHeaders = function getHeaders() { + return this.headers; +}; + +ServerResponse.prototype.hasHeader = function hasHeader(name) { + return name in this.headers; +}; + +ServerResponse.prototype.removeHeader = function removeHeader(name) { + if (!(name in this.headers)) { + return; + } + + let name_len = Buffer.byteLength(name + "", 'latin1'); + + if (Array.isArray(this.headers[name])) { + this.headers_count -= this.headers[name].length; + this.headers_len -= this.headers[name].length * name_len; + + this.headers[name].forEach(function(val) { + this.headers_len -= Buffer.byteLength(val + "", 'latin1'); + }); + + } else { + this.headers_count--; + this.headers_len -= name_len + Buffer.byteLength(this.headers[name] + "", 'latin1'); + } + + delete this.headers[name]; +}; + +ServerResponse.prototype.sendDate = function sendDate() { + throw new Error("Not supported"); +}; + +ServerResponse.prototype.setTimeout = function setTimeout(msecs, callback) { + this.timeout = msecs; + + if (callback) { + this.on('timeout', callback); + } + + return this; +}; + +// for Express +ServerResponse.prototype._implicitHeader = function _implicitHeader() { + this.writeHead(this.statusCode); +}; + +ServerResponse.prototype.writeHead = writeHead; +ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead; + +function writeHead(statusCode, reason, obj) { + var originalStatusCode = statusCode; + + statusCode |= 0; + + if (statusCode < 100 || statusCode > 999) { + throw new ERR_HTTP_INVALID_STATUS_CODE(originalStatusCode); + } + + if (typeof reason === 'string') { + this.statusMessage = reason; + + } else { + if (!this.statusMessage) { + this.statusMessage = http.STATUS_CODES[statusCode] || 'unknown'; + } + + obj = reason; + } + + this.statusCode = statusCode; + + if (obj) { + var k; + var keys = Object.keys(obj); + + for (var i = 0; i < keys.length; i++) { + k = keys[i]; + + if (k) { + this.setHeader(k, obj[k]); + } + } + } + + unit_lib.unit_response_headers(this, statusCode, this.headers, this.headers_count, this.headers_len); + + this.headersSent = true; +}; + +ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { + var contentLength = 0; + + if (!this.headersSent) { + this.writeHead(this.statusCode); + } + + if (this.finished) { + return this; + } + + if (typeof chunk === 'function') { + callback = chunk; + chunk = null; + + } else if (typeof encoding === 'function') { + callback = encoding; + encoding = null; + } + + if (chunk) { + if (typeof chunk !== 'string' && !(chunk instanceof Buffer)) { + throw new TypeError('First argument must be a string or Buffer'); + } + + if (typeof chunk === 'string') { + contentLength = Buffer.byteLength(chunk, encoding); + + } else { + contentLength = chunk.length; + } + + unit_lib.unit_response_write(this, chunk, contentLength); + } + + if (typeof callback === 'function') { + callback(this); + } +}; + +ServerResponse.prototype.write = function write(chunk, encoding, callback) { + this._writeBody(chunk, encoding, callback); + + return this; +}; + +ServerResponse.prototype.end = function end(chunk, encoding, callback) { + this._writeBody(chunk, encoding, callback); + unit_lib.unit_response_end(this) + + this.finished = true; + + return this; +}; + +function ServerRequest(server) { + EventEmitter.call(this); + + this.server = server; +} +util.inherits(ServerRequest, EventEmitter); + +ServerRequest.prototype.unpipe = undefined; + +ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) { + this.timeout = msecs; + + if (callback) { + this.on('timeout', callback); + } + + return this; +}; + +ServerRequest.prototype.statusCode = function statusCode() { + /* Only valid for response obtained from http.ClientRequest. */ +}; + +ServerRequest.prototype.statusMessage = function statusMessage() { + /* Only valid for response obtained from http.ClientRequest. */ +}; + +ServerRequest.prototype.trailers = function trailers() { + throw new Error("Not supported"); +}; + +ServerRequest.prototype.METHODS = function METHODS() { + return http.METHODS; +}; + +ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() { + return http.STATUS_CODES; +}; + +ServerRequest.prototype.listeners = function listeners() { + return []; +}; + +ServerRequest.prototype.resume = function resume() { + return []; +}; + +function Server(requestListener) { + EventEmitter.call(this); + + this.unit = new unit_lib.Unit(); + this.unit.createServer(); + + this.unit.server = this; + + this.socket = Socket; + this.request = ServerRequest; + this.response = ServerResponse; + + if (requestListener) { + this.on('request', requestListener); + } +} +util.inherits(Server, EventEmitter); + +Server.prototype.setTimeout = function setTimeout(msecs, callback) { + this.timeout = msecs; + + if (callback) { + this.on('timeout', callback); + } + + return this; +}; + +Server.prototype.listen = function () { + this.unit.listen(); +}; + +function connectionListener(socket) { +} + + +module.exports = { + STATUS_CODES: http.STATUS_CODES, + Server, + ServerResponse, + ServerRequest, + _connectionListener: connectionListener +}; diff --git a/src/nodejs/unit-http/package.json b/src/nodejs/unit-http/package.json new file mode 100644 index 00000000..3a15d573 --- /dev/null +++ b/src/nodejs/unit-http/package.json @@ -0,0 +1,29 @@ +{ + "name": "unit-http", + "version": "1.0.0", + "description": "HTTP module for NGINX Unit", + "main": "http.js", + "files": [ + "addon.cpp", + "binding.gyp", + "http_server.js", + "http.js", + "package.json", + "socket.js", + "unit.cpp", + "unit.h", + "README.md" + ], + "scripts": { + "clean": "node-gyp clean", + "configure": "node-gyp configure", + "build": "node-gyp build", + "install": "node-gyp configure build" + }, + "author": "Alexander Borisov", + "license": "Apache-2.0", + "gypfile": true, + "dependencies": { + "node-addon-api": "1.2.0" + } +} diff --git a/src/nodejs/unit-http/socket.js b/src/nodejs/unit-http/socket.js new file mode 100755 index 00000000..89702834 --- /dev/null +++ b/src/nodejs/unit-http/socket.js @@ -0,0 +1,99 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +'use strict'; + +const EventEmitter = require('events'); +const util = require('util'); +const unit_lib = require('unit-http/build/Release/unit-http.node'); + +function Socket(options) { + EventEmitter.call(this); + + if (typeof options === 'number') { + options = { fd: options }; + + } else if (options === undefined) { + options = {}; + } + + this.readable = options.readable !== false; + this.writable = options.writable !== false; +} +util.inherits(Socket, EventEmitter); + +Socket.prototype.bufferSize = 0; +Socket.prototype.bytesRead = 0; +Socket.prototype.bytesWritten = 0; +Socket.prototype.connecting = false; +Socket.prototype.destroyed = false; +Socket.prototype.localAddress = ""; +Socket.prototype.localPort = 0; +Socket.prototype.remoteAddress = ""; +Socket.prototype.remoteFamily = ""; +Socket.prototype.remotePort = 0; + +Socket.prototype.address = function address() { +}; + +Socket.prototype.connect = function connect(options, callback) { + if (callback !== null) { + this.once('connect', cb); + } + + this.connecting = true; + this.writable = true; +}; + +Socket.prototype.address = function address() { +}; + +Socket.prototype.destroy = function destroy(exception) { + this.connecting = false; + this.readable = false; + this.writable = false; +}; + +Socket.prototype.end = function end(data, encoding) { +}; + +Socket.prototype.pause = function pause() { +}; + +Socket.prototype.ref = function ref() { +}; + +Socket.prototype.resume = function resume() { +}; + +Socket.prototype.setEncoding = function setEncoding(encoding) { +}; + +Socket.prototype.setKeepAlive = function setKeepAlive(enable, initialDelay) { +}; + +Socket.prototype.setNoDelay = function setNoDelay(noDelay) { +}; + +Socket.prototype.setTimeout = function setTimeout(msecs, callback) { + this.timeout = msecs; + + if (callback) { + this.on('timeout', callback); + } + + return this; +}; + +Socket.prototype.unref = function unref() { +}; + +Socket.prototype.write = function write(data, encoding, callback) { +}; + + +module.exports = { + Socket +}; diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp new file mode 100644 index 00000000..40f641a6 --- /dev/null +++ b/src/nodejs/unit-http/unit.cpp @@ -0,0 +1,905 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include "unit.h" + + +napi_ref Unit::constructor_; + + +Unit::Unit(napi_env env): + env_(env), + wrapper_(nullptr), + unit_ctx_(nullptr) +{ +} + + +Unit::~Unit() +{ + napi_delete_reference(env_, wrapper_); +} + + +napi_value +Unit::init(napi_env env, napi_value exports) +{ + napi_value cons, fn; + napi_status status; + + napi_property_descriptor properties[] = { + { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, + { "listen", 0, listen, 0, 0, 0, napi_default, 0 } + }; + + status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr, + 2, properties, &cons); + if (status != napi_ok) { + goto failed; + } + + status = napi_create_reference(env, cons, 1, &constructor_); + if (status != napi_ok) { + goto failed; + } + + status = napi_set_named_property(env, exports, "Unit", cons); + if (status != napi_ok) { + goto failed; + } + + status = napi_create_function(env, NULL, 0, response_send_headers, NULL, + &fn); + if (status != napi_ok) { + goto failed; + } + + status = napi_set_named_property(env, exports, + "unit_response_headers", fn); + if (status != napi_ok) { + goto failed; + } + + status = napi_create_function(env, NULL, 0, response_write, NULL, &fn); + if (status != napi_ok) { + goto failed; + } + + status = napi_set_named_property(env, exports, "unit_response_write", fn); + if (status != napi_ok) { + goto failed; + } + + status = napi_create_function(env, NULL, 0, response_end, NULL, &fn); + if (status != napi_ok) { + goto failed; + } + + status = napi_set_named_property(env, exports, "unit_response_end", fn); + if (status != napi_ok) { + goto failed; + } + + return exports; + +failed: + + napi_throw_error(env, NULL, "Failed to define Unit class"); + + return nullptr; +} + + +void +Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) +{ + Unit *obj = reinterpret_cast<Unit *>(nativeObject); + + delete obj; +} + + +napi_value +Unit::create(napi_env env, napi_callback_info info) +{ + Unit *obj; + napi_value target, cons, instance, jsthis; + napi_status status; + + status = napi_get_new_target(env, info, &target); + if (status != napi_ok) { + goto failed; + } + + if (target != nullptr) { + /* Invoked as constructor: `new Unit(...)` */ + status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, + nullptr); + if (status != napi_ok) { + goto failed; + } + + obj = new Unit(env); + + status = napi_wrap(env, jsthis, reinterpret_cast<void *>(obj), + destroy, nullptr, &obj->wrapper_); + if (status != napi_ok) { + goto failed; + } + + return jsthis; + } + + /* Invoked as plain function `Unit(...)`, turn into construct call. */ + status = napi_get_reference_value(env, constructor_, &cons); + if (status != napi_ok) { + goto failed; + } + + status = napi_new_instance(env, cons, 0, nullptr, &instance); + if (status != napi_ok) { + goto failed; + } + + return instance; + +failed: + + napi_throw_error(env, NULL, "Failed to create Unit object"); + + return nullptr; +} + + +napi_value +Unit::create_server(napi_env env, napi_callback_info info) +{ + Unit *obj; + size_t argc; + napi_value jsthis; + napi_status status; + napi_value argv[1]; + nxt_unit_init_t unit_init; + + argc = 1; + + status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr); + if (status != napi_ok) { + goto failed; + } + + status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj)); + if (status != napi_ok) { + goto failed; + } + + memset(&unit_init, 0, sizeof(nxt_unit_init_t)); + + unit_init.data = obj; + unit_init.callbacks.request_handler = request_handler; + + obj->unit_ctx_ = nxt_unit_init(&unit_init); + if (obj->unit_ctx_ == NULL) { + goto failed; + } + + return nullptr; + +failed: + + napi_throw_error(env, NULL, "Failed to create Unit object"); + + return nullptr; +} + + +napi_value +Unit::listen(napi_env env, napi_callback_info info) +{ + int ret; + Unit *obj; + napi_value jsthis; + napi_status status; + + status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr); + if (status != napi_ok) { + goto failed; + } + + status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj)); + if (status != napi_ok) { + goto failed; + } + + if (obj->unit_ctx_ == NULL) { + napi_throw_error(env, NULL, "Unit context was not created"); + return nullptr; + } + + ret = nxt_unit_run(obj->unit_ctx_); + if (ret != NXT_UNIT_OK) { + napi_throw_error(env, NULL, "Failed to run Unit"); + return nullptr; + } + + nxt_unit_done(obj->unit_ctx_); + + return nullptr; + +failed: + + napi_throw_error(env, NULL, "Failed to listen Unit socket"); + + return nullptr; +} + + +void +Unit::request_handler(nxt_unit_request_info_t *req) +{ + Unit *obj; + napi_value socket, request, response; + napi_value global, server_obj; + napi_value req_argv[3]; + napi_status status; + + obj = reinterpret_cast<Unit *>(req->unit->data); + + napi_handle_scope scope; + status = napi_open_handle_scope(obj->env_, &scope); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to create handle scope"); + return; + } + + server_obj = obj->get_server_object(); + if (server_obj == nullptr) { + napi_throw_error(obj->env_, NULL, "Failed to get server object"); + return; + } + + status = napi_get_global(obj->env_, &global); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to get global variable"); + return; + } + + socket = obj->create_socket(server_obj, req); + if (socket == nullptr) { + napi_throw_error(obj->env_, NULL, "Failed to create socket object"); + return; + } + + request = obj->create_request(server_obj, socket); + if (request == nullptr) { + napi_throw_error(obj->env_, NULL, "Failed to create request object"); + return; + } + + response = obj->create_response(server_obj, socket, request, req, obj); + if (response == nullptr) { + napi_throw_error(obj->env_, NULL, "Failed to create response object"); + return; + } + + req_argv[1] = request; + req_argv[2] = response; + + status = obj->create_headers(req, request); + if (status != napi_ok) { + napi_throw_error(obj->env_, NULL, "Failed to create headers"); + return; + } + + obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv); + obj->emit_post_data(request, req); + + napi_close_handle_scope(obj->env_, scope); +} + + +napi_value +Unit::get_server_object() +{ + napi_value unit_obj, server_obj; + napi_status status; + + status = napi_get_reference_value(env_, wrapper_, &unit_obj); + if (status != napi_ok) { + return nullptr; + } + + status = napi_get_named_property(env_, unit_obj, "server", &server_obj); + if (status != napi_ok) { + return nullptr; + } + + return server_obj; +} + + +napi_value +Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc, + napi_value *argv) +{ + napi_value emitter, return_val, str; + napi_status status; + + status = napi_get_named_property(env_, obj, "emit", &emitter); + if (status != napi_ok) { + return nullptr; + } + + status = napi_create_string_latin1(env_, name, name_len, &str); + if (status != napi_ok) { + return nullptr; + } + + if (argc != 0) { + argv[0] = str; + + } else { + argc = 1; + argv = &str; + } + + status = napi_call_function(env_, obj, emitter, argc, argv, &return_val); + if (status != napi_ok) { + return nullptr; + } + + return return_val; +} + + +napi_status +Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) +{ + uint32_t i; + const char *p; + napi_value headers, raw_headers, str; + napi_status status; + nxt_unit_field_t *f; + nxt_unit_request_t *r; + + r = req->request; + + status = napi_create_object(env_, &headers); + if (status != napi_ok) { + return status; + } + + status = napi_create_array_with_length(env_, r->fields_count * 2, + &raw_headers); + if (status != napi_ok) { + return status; + } + + for (i = 0; i < r->fields_count; i++) { + f = r->fields + i; + + status = this->append_header(f, headers, raw_headers, i); + if (status != napi_ok) { + return status; + } + } + + status = napi_set_named_property(env_, request, "headers", headers); + if (status != napi_ok) { + return status; + } + + status = napi_set_named_property(env_, request, "raw_headers", raw_headers); + if (status != napi_ok) { + return status; + } + + p = (const char *) nxt_unit_sptr_get(&r->version); + + status = napi_create_string_latin1(env_, p, r->version_length, &str); + if (status != napi_ok) { + return status; + } + + status = napi_set_named_property(env_, request, "httpVersion", str); + if (status != napi_ok) { + return status; + } + + p = (const char *) nxt_unit_sptr_get(&r->method); + + status = napi_create_string_latin1(env_, p, r->method_length, &str); + if (status != napi_ok) { + return status; + } + + status = napi_set_named_property(env_, request, "method", str); + if (status != napi_ok) { + return status; + } + + p = (const char *) nxt_unit_sptr_get(&r->target); + + status = napi_create_string_latin1(env_, p, r->target_length, &str); + if (status != napi_ok) { + return status; + } + + status = napi_set_named_property(env_, request, "url", str); + if (status != napi_ok) { + return status; + } + + return napi_ok; +} + + +inline napi_status +Unit::append_header(nxt_unit_field_t *f, napi_value headers, + napi_value raw_headers, uint32_t idx) +{ + const char *name, *value; + napi_value str, vstr; + napi_status status; + + value = (const char *) nxt_unit_sptr_get(&f->value); + + status = napi_create_string_latin1(env_, value, f->value_length, &vstr); + if (status != napi_ok) { + return status; + } + + name = (const char *) nxt_unit_sptr_get(&f->name); + + status = napi_set_named_property(env_, headers, name, vstr); + if (status != napi_ok) { + return status; + } + + status = napi_create_string_latin1(env_, name, f->name_length, &str); + if (status != napi_ok) { + return status; + } + + status = napi_set_element(env_, raw_headers, idx * 2, str); + if (status != napi_ok) { + return status; + } + + status = napi_set_element(env_, raw_headers, idx * 2 + 1, vstr); + if (status != napi_ok) { + return status; + } + + return napi_ok; +} + + +napi_value +Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) +{ + napi_value constructor, return_val; + napi_status status; + + status = napi_get_named_property(env_, server_obj, "socket", + &constructor); + if (status != napi_ok) { + return nullptr; + } + + status = napi_new_instance(env_, constructor, 0, NULL, &return_val); + if (status != napi_ok) { + return nullptr; + } + + return return_val; +} + + +napi_value +Unit::create_request(napi_value server_obj, napi_value socket) +{ + napi_value constructor, return_val; + napi_status status; + + status = napi_get_named_property(env_, server_obj, "request", + &constructor); + if (status != napi_ok) { + return nullptr; + } + + status = napi_new_instance(env_, constructor, 1, &server_obj, + &return_val); + if (status != napi_ok) { + return nullptr; + } + + status = napi_set_named_property(env_, return_val, "socket", socket); + if (status != napi_ok) { + return nullptr; + } + + return return_val; +} + + +napi_value +Unit::create_response(napi_value server_obj, napi_value socket, + napi_value request, nxt_unit_request_info_t *req, + Unit *obj) +{ + napi_value constructor, return_val, req_num; + napi_status status; + + status = napi_get_named_property(env_, server_obj, "response", + &constructor); + if (status != napi_ok) { + return nullptr; + } + + status = napi_new_instance(env_, constructor, 1, &request, &return_val); + if (status != napi_ok) { + return nullptr; + } + + status = napi_set_named_property(env_, return_val, "socket", socket); + if (status != napi_ok) { + return nullptr; + } + + status = napi_create_int64(env_, (int64_t) (uintptr_t) req, &req_num); + if (status != napi_ok) { + return nullptr; + } + + status = napi_set_named_property(env_, return_val, "_req_point", req_num); + if (status != napi_ok) { + return nullptr; + } + + return return_val; +} + + +void +Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req) +{ + void *data; + napi_value req_argv[2]; + napi_status status; + + status = napi_create_buffer(env_, (size_t) req->content_length, + &data, &req_argv[1]); + if (status != napi_ok) { + napi_throw_error(env_, NULL, "Failed to create request buffer"); + return; + } + + nxt_unit_request_read(req, data, req->content_length); + + emit(request, "data", sizeof("data") - 1, 2, req_argv); + emit(request, "end", sizeof("end") - 1, 0, nullptr); +} + + +napi_value +Unit::response_send_headers(napi_env env, napi_callback_info info) +{ + int ret; + char *ptr, *name_ptr; + bool is_array; + size_t argc, name_len, value_len; + int64_t req_p; + uint32_t status_code, header_len, keys_len, array_len; + uint32_t keys_count, i, j; + uint16_t hash; + napi_value this_arg, headers, keys, name, value, array_val; + napi_value req_num; + napi_status status; + nxt_unit_field_t *f; + nxt_unit_request_info_t *req; + napi_value argv[5]; + + argc = 5; + + status = napi_get_cb_info(env, info, &argc, argv, &this_arg, NULL); + if (status != napi_ok) { + return nullptr; + } + + if (argc != 5) { + napi_throw_error(env, NULL, "Wrong args count. Need three: " + "statusCode, headers, headers count, headers length"); + return nullptr; + } + + status = napi_get_named_property(env, argv[0], "_req_point", &req_num); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); + return nullptr; + } + + status = napi_get_value_int64(env, req_num, &req_p); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); + return nullptr; + } + + req = (nxt_unit_request_info_t *) (uintptr_t) req_p; + + status = napi_get_value_uint32(env, argv[1], &status_code); + if (status != napi_ok) { + goto failed; + } + + status = napi_get_value_uint32(env, argv[3], &keys_count); + if (status != napi_ok) { + goto failed; + } + + status = napi_get_value_uint32(env, argv[4], &header_len); + if (status != napi_ok) { + goto failed; + } + + /* Need to reserve extra byte for C-string 0-termination. */ + header_len++; + + headers = argv[2]; + + ret = nxt_unit_response_init(req, status_code, keys_count, header_len); + if (ret != NXT_UNIT_OK) { + goto failed; + } + + status = napi_get_property_names(env, headers, &keys); + if (status != napi_ok) { + goto failed; + } + + status = napi_get_array_length(env, keys, &keys_len); + if (status != napi_ok) { + goto failed; + } + + ptr = req->response_buf->free; + + for (i = 0; i < keys_len; i++) { + status = napi_get_element(env, keys, i, &name); + if (status != napi_ok) { + goto failed; + } + + status = napi_get_property(env, headers, name, &value); + if (status != napi_ok) { + goto failed; + } + + status = napi_get_value_string_latin1(env, name, ptr, header_len, + &name_len); + if (status != napi_ok) { + goto failed; + } + + name_ptr = ptr; + + ptr += name_len; + header_len -= name_len; + + hash = nxt_unit_field_hash(name_ptr, name_len); + + status = napi_is_array(env, value, &is_array); + if (status != napi_ok) { + goto failed; + } + + if (is_array) { + status = napi_get_array_length(env, value, &array_len); + if (status != napi_ok) { + goto failed; + } + + for (j = 0; j < array_len; j++) { + status = napi_get_element(env, value, j, &array_val); + if (status != napi_ok) { + goto failed; + } + + status = napi_get_value_string_latin1(env, array_val, ptr, + header_len, + &value_len); + if (status != napi_ok) { + goto failed; + } + + f = req->response->fields + req->response->fields_count; + f->skip = 0; + + nxt_unit_sptr_set(&f->name, name_ptr); + + f->name_length = name_len; + f->hash = hash; + + nxt_unit_sptr_set(&f->value, ptr); + f->value_length = (uint32_t) value_len; + + ptr += value_len; + header_len -= value_len; + + req->response->fields_count++; + } + + } else { + status = napi_get_value_string_latin1(env, value, ptr, header_len, + &value_len); + if (status != napi_ok) { + goto failed; + } + + f = req->response->fields + req->response->fields_count; + f->skip = 0; + + nxt_unit_sptr_set(&f->name, name_ptr); + + f->name_length = name_len; + f->hash = hash; + + nxt_unit_sptr_set(&f->value, ptr); + f->value_length = (uint32_t) value_len; + + ptr += value_len; + header_len -= value_len; + + req->response->fields_count++; + } + } + + req->response_buf->free = ptr; + + ret = nxt_unit_response_send(req); + if (ret != NXT_UNIT_OK) { + goto failed; + } + + return this_arg; + +failed: + + req->response->fields_count = 0; + + napi_throw_error(env, NULL, "Failed to write headers"); + + return nullptr; +} + + +napi_value +Unit::response_write(napi_env env, napi_callback_info info) +{ + int ret; + char *ptr; + size_t argc, have_buf_len; + int64_t req_p; + uint32_t buf_len; + napi_value this_arg, req_num; + napi_status status; + nxt_unit_buf_t *buf; + napi_valuetype buf_type; + nxt_unit_request_info_t *req; + napi_value argv[3]; + + argc = 3; + + status = napi_get_cb_info(env, info, &argc, argv, &this_arg, NULL); + if (status != napi_ok) { + goto failed; + } + + if (argc != 3) { + napi_throw_error(env, NULL, "Wrong args count. Need two: " + "chunk, chunk length"); + return nullptr; + } + + status = napi_get_named_property(env, argv[0], "_req_point", &req_num); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); + return nullptr; + } + + status = napi_get_value_int64(env, req_num, &req_p); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); + return nullptr; + } + + req = (nxt_unit_request_info_t *) (uintptr_t) req_p; + + status = napi_get_value_uint32(env, argv[2], &buf_len); + if (status != napi_ok) { + goto failed; + } + + status = napi_typeof(env, argv[1], &buf_type); + if (status != napi_ok) { + goto failed; + } + + buf_len++; + + buf = nxt_unit_response_buf_alloc(req, buf_len); + if (buf == NULL) { + goto failed; + } + + if (buf_type == napi_string) { + /* TODO: will work only for utf8 content-type */ + + status = napi_get_value_string_utf8(env, argv[1], buf->free, + buf_len, &have_buf_len); + + } else { + status = napi_get_buffer_info(env, argv[1], (void **) &ptr, + &have_buf_len); + + memcpy(buf->free, ptr, have_buf_len); + } + + if (status != napi_ok) { + goto failed; + } + + buf->free += have_buf_len; + + ret = nxt_unit_buf_send(buf); + if (ret != NXT_UNIT_OK) { + goto failed; + } + + return this_arg; + +failed: + + napi_throw_error(env, NULL, "Failed to write body"); + + return nullptr; +} + + +napi_value +Unit::response_end(napi_env env, napi_callback_info info) +{ + size_t argc; + int64_t req_p; + napi_value resp, this_arg, req_num; + napi_status status; + nxt_unit_request_info_t *req; + + argc = 1; + + status = napi_get_cb_info(env, info, &argc, &resp, &this_arg, NULL); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to finalize sending body"); + return nullptr; + } + + status = napi_get_named_property(env, resp, "_req_point", &req_num); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); + return nullptr; + } + + status = napi_get_value_int64(env, req_num, &req_p); + if (status != napi_ok) { + napi_throw_error(env, NULL, "Failed to get request pointer"); + return nullptr; + } + + req = (nxt_unit_request_info_t *) (uintptr_t) req_p; + + nxt_unit_request_done(req, NXT_UNIT_OK); + + return this_arg; +} diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h new file mode 100644 index 00000000..753a14d8 --- /dev/null +++ b/src/nodejs/unit-http/unit.h @@ -0,0 +1,77 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_NODEJS_UNIT_H_INCLUDED_ +#define _NXT_NODEJS_UNIT_H_INCLUDED_ + + +#include <node_api.h> + + +#ifdef __cplusplus +extern "C" { +#endif + +#include <nxt_unit.h> +#include <nxt_unit_response.h> +#include <nxt_unit_request.h> + +#ifdef __cplusplus +} /* extern "C" */ +#endif + + +class Unit { +public: + static napi_value init(napi_env env, napi_value exports); + +private: + Unit(napi_env env); + ~Unit(); + + static napi_value create(napi_env env, napi_callback_info info); + static void destroy(napi_env env, void *nativeObject, void *finalize_hint); + + static napi_value create_server(napi_env env, napi_callback_info info); + static napi_value listen(napi_env env, napi_callback_info info); + static void request_handler(nxt_unit_request_info_t *req); + + napi_value get_server_object(); + + napi_value emit(napi_value obj, const char *name, size_t name_len, + size_t argc, napi_value *argv); + + napi_value create_socket(napi_value server_obj, + nxt_unit_request_info_t *req); + + napi_value create_request(napi_value server_obj, napi_value socket); + + napi_value create_response(napi_value server_obj, napi_value socket, + napi_value request, + nxt_unit_request_info_t *req, Unit *obj); + + void emit_post_data(napi_value request, nxt_unit_request_info_t *req); + + static napi_value response_send_headers(napi_env env, + napi_callback_info info); + + static napi_value response_write(napi_env env, napi_callback_info info); + static napi_value response_end(napi_env env, napi_callback_info info); + + napi_status create_headers(nxt_unit_request_info_t *req, + napi_value request); + + inline napi_status append_header(nxt_unit_field_t *f, napi_value headers, + napi_value raw_headers, uint32_t idx); + + static napi_ref constructor_; + + napi_env env_; + napi_ref wrapper_; + nxt_unit_ctx_t *unit_ctx_; +}; + + +#endif /* _NXT_NODEJS_H_INCLUDED_ */ diff --git a/src/nxt_app_log.c b/src/nxt_app_log.c index 83eac371..0ecd9aac 100644 --- a/src/nxt_app_log.c +++ b/src/nxt_app_log.c @@ -97,7 +97,7 @@ static nxt_time_string_t nxt_log_error_time_cache = { "%4d/%02d/%02d %02d:%02d:%02d ", nxt_length("1970/09/28 12:00:00 "), NXT_THREAD_TIME_LOCAL, - NXT_THREAD_TIME_MSEC, + NXT_THREAD_TIME_SEC, }; diff --git a/src/nxt_application.c b/src/nxt_application.c index 3c62f7d4..acdebe04 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -507,15 +507,15 @@ nxt_app_parse_type(u_char *p, size_t length) str.length = length; str.start = p; - if (nxt_str_eq(&str, "python", 6)) { + if (nxt_str_eq(&str, "external", 8) || nxt_str_eq(&str, "go", 2)) { + return NXT_APP_EXTERNAL; + + } else if (nxt_str_eq(&str, "python", 6)) { return NXT_APP_PYTHON; } else if (nxt_str_eq(&str, "php", 3)) { return NXT_APP_PHP; - } else if (nxt_str_eq(&str, "go", 2)) { - return NXT_APP_GO; - } else if (nxt_str_eq(&str, "perl", 4)) { return NXT_APP_PERL; diff --git a/src/nxt_application.h b/src/nxt_application.h index 35346655..10f5a922 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -15,9 +15,9 @@ typedef enum { + NXT_APP_EXTERNAL, NXT_APP_PYTHON, NXT_APP_PHP, - NXT_APP_GO, NXT_APP_PERL, NXT_APP_RUBY, @@ -40,6 +40,12 @@ typedef struct nxt_common_app_conf_s nxt_common_app_conf_t; typedef struct { + char *executable; + nxt_conf_value_t *arguments; +} nxt_external_app_conf_t; + + +typedef struct { char *home; nxt_str_t path; nxt_str_t module; @@ -55,12 +61,6 @@ typedef struct { typedef struct { - char *executable; - nxt_conf_value_t *arguments; -} nxt_go_app_conf_t; - - -typedef struct { char *script; } nxt_perl_app_conf_t; @@ -80,11 +80,11 @@ struct nxt_common_app_conf_s { nxt_conf_value_t *environment; union { - nxt_python_app_conf_t python; - nxt_php_app_conf_t php; - nxt_go_app_conf_t go; - nxt_perl_app_conf_t perl; - nxt_ruby_app_conf_t ruby; + nxt_external_app_conf_t external; + nxt_python_app_conf_t python; + nxt_php_app_conf_t php; + nxt_perl_app_conf_t perl; + nxt_ruby_app_conf_t ruby; } u; }; @@ -161,7 +161,7 @@ nxt_app_lang_module_t *nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name); nxt_app_type_t nxt_app_parse_type(u_char *p, size_t length); NXT_EXPORT extern nxt_str_t nxt_server; -extern nxt_app_module_t nxt_go_module; +extern nxt_app_module_t nxt_external_module; NXT_EXPORT nxt_int_t nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init); diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index ca7107cc..b1e30955 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -256,6 +256,21 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_common_members[] = { }; +static nxt_conf_vldt_object_t nxt_conf_vldt_external_members[] = { + { nxt_string("executable"), + NXT_CONF_VLDT_STRING, + NULL, + NULL }, + + { nxt_string("arguments"), + NXT_CONF_VLDT_ARRAY, + &nxt_conf_vldt_array_iterator, + (void *) &nxt_conf_vldt_argument }, + + NXT_CONF_VLDT_NEXT(&nxt_conf_vldt_common_members) +}; + + static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = { { nxt_string("home"), NXT_CONF_VLDT_STRING, @@ -321,21 +336,6 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_php_members[] = { }; -static nxt_conf_vldt_object_t nxt_conf_vldt_go_members[] = { - { nxt_string("executable"), - NXT_CONF_VLDT_STRING, - NULL, - NULL }, - - { nxt_string("arguments"), - NXT_CONF_VLDT_ARRAY, - &nxt_conf_vldt_array_iterator, - (void *) &nxt_conf_vldt_argument }, - - NXT_CONF_VLDT_NEXT(&nxt_conf_vldt_common_members) -}; - - static nxt_conf_vldt_object_t nxt_conf_vldt_perl_members[] = { { nxt_string("script"), NXT_CONF_VLDT_STRING, @@ -565,9 +565,9 @@ nxt_conf_vldt_app(nxt_conf_validation_t *vldt, nxt_str_t *name, static nxt_str_t type_str = nxt_string("type"); static void *members[] = { + nxt_conf_vldt_external_members, nxt_conf_vldt_python_members, nxt_conf_vldt_php_members, - nxt_conf_vldt_go_members, nxt_conf_vldt_perl_members, nxt_conf_vldt_ruby_members, }; diff --git a/src/nxt_conn.h b/src/nxt_conn.h index 6f6cae9c..d8b48694 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -175,7 +175,7 @@ struct nxt_conn_s { do { \ (ev)->work_queue = (wq); \ (ev)->log = &(c)->log; \ - (ev)->precision = NXT_TIMER_DEFAULT_PRECISION; \ + (ev)->bias = NXT_TIMER_DEFAULT_BIAS; \ } while (0) diff --git a/src/nxt_conn_read.c b/src/nxt_conn_read.c index 21086233..e458bf81 100644 --- a/src/nxt_conn_read.c +++ b/src/nxt_conn_read.c @@ -105,7 +105,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) * occured during read operation, it toggled write event * internally so only read timer should be set. */ - if (c->read_timer.state == NXT_TIMER_DISABLED) { + if (!c->read_timer.enabled) { nxt_conn_timer(engine, c, state, &c->read_timer); } @@ -117,7 +117,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) nxt_fd_event_enable_read(engine, &c->socket); } - if (state->timer_autoreset || c->read_timer.state == NXT_TIMER_DISABLED) { + if (state->timer_autoreset || !c->read_timer.enabled) { nxt_conn_timer(engine, c, state, &c->read_timer); } } diff --git a/src/nxt_conn_write.c b/src/nxt_conn_write.c index 73c3fa02..80d6f5cf 100644 --- a/src/nxt_conn_write.c +++ b/src/nxt_conn_write.c @@ -127,7 +127,9 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) return; } - /* ret == NXT_ERROR */ + if (ret != NXT_ERROR) { + return; + } nxt_fd_event_block_write(engine, &c->socket); diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index 68e8d609..9f9c8f62 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -67,7 +67,7 @@ static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, uint32_t events); -static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); +static void nxt_epoll_commit_changes(nxt_event_engine_t *engine); static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); #if (NXT_HAVE_SIGNALFD) static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); @@ -593,7 +593,7 @@ nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, engine->u.epoll.fd, ev->fd, op, events); if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { - (void) nxt_epoll_commit_changes(engine); + nxt_epoll_commit_changes(engine); } ev->changing = 1; @@ -605,18 +605,16 @@ nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, } -static nxt_int_t +static void nxt_epoll_commit_changes(nxt_event_engine_t *engine) { int ret; - nxt_int_t retval; nxt_fd_event_t *ev; nxt_epoll_change_t *change, *end; nxt_debug(&engine->task, "epoll %d changes:%ui", engine->u.epoll.fd, engine->u.epoll.nchanges); - retval = NXT_OK; change = engine->u.epoll.changes; end = change + engine->u.epoll.nchanges; @@ -637,7 +635,7 @@ nxt_epoll_commit_changes(nxt_event_engine_t *engine) nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler, ev->task, ev, ev->data); - retval = NXT_ERROR; + engine->u.epoll.error = 1; } change++; @@ -645,8 +643,6 @@ nxt_epoll_commit_changes(nxt_event_engine_t *engine) } while (change < end); engine->u.epoll.nchanges = 0; - - return retval; } @@ -884,10 +880,13 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) struct epoll_event *event; if (engine->u.epoll.nchanges != 0) { - if (nxt_epoll_commit_changes(engine) != NXT_OK) { - /* Error handlers have been enqueued on failure. */ - timeout = 0; - } + nxt_epoll_commit_changes(engine); + } + + if (engine->u.epoll.error) { + engine->u.epoll.error = 0; + /* Error handlers have been enqueued on failure. */ + timeout = 0; } nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", @@ -921,8 +920,8 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) ev->fd, events, ev, ev->read, ev->write); /* - * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or - * EPOLLOUT, so the "error" variable enqueues only one active handler. + * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN + * or EPOLLOUT, so the "error" variable enqueues only error handler. */ error = ((events & (EPOLLERR | EPOLLHUP)) != 0); ev->epoll_error = error; @@ -933,7 +932,7 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) #endif - if ((events & EPOLLIN) || error) { + if ((events & EPOLLIN) != 0) { ev->read_ready = 1; if (ev->read != NXT_EVENT_BLOCKED) { @@ -942,8 +941,6 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) ev->read = NXT_EVENT_DISABLED; } - error = 0; - nxt_work_queue_add(ev->read_work_queue, ev->read_handler, ev->task, ev, ev->data); @@ -951,9 +948,11 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) /* Level-triggered mode. */ nxt_epoll_disable_read(engine, ev); } + + error = 0; } - if ((events & EPOLLOUT) || error) { + if ((events & EPOLLOUT) != 0) { ev->write_ready = 1; if (ev->write != NXT_EVENT_BLOCKED) { @@ -962,8 +961,6 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) ev->write = NXT_EVENT_DISABLED; } - error = 0; - nxt_work_queue_add(ev->write_work_queue, ev->write_handler, ev->task, ev, ev->data); @@ -971,12 +968,29 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) /* Level-triggered mode. */ nxt_epoll_disable_write(engine, ev); } + + error = 0; } - if (error) { - ev->read_ready = 1; - ev->write_ready = 1; + if (!error) { + continue; + } + + ev->read_ready = 1; + ev->write_ready = 1; + + if (ev->read == NXT_EVENT_BLOCKED && ev->write == NXT_EVENT_BLOCKED) { + + if (engine->u.epoll.mode == 0) { + /* Level-triggered mode. */ + nxt_epoll_disable(engine, ev); + } + + continue; } + + nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler, + ev->task, ev, ev->data); } } diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 7a7149e6..365d9e89 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -202,6 +202,8 @@ typedef struct { nxt_uint_t mchanges; int mevents; + uint8_t error; /* 1 bit */ + nxt_epoll_change_t *changes; struct epoll_event *events; diff --git a/src/nxt_go.c b/src/nxt_external.c index 4e7d0488..c7aacffc 100644 --- a/src/nxt_go.c +++ b/src/nxt_external.c @@ -9,21 +9,24 @@ #include <nxt_unit.h> -static nxt_int_t nxt_go_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static nxt_int_t nxt_external_init(nxt_task_t *task, + nxt_common_app_conf_t *conf); -nxt_app_module_t nxt_go_module = { + +nxt_app_module_t nxt_external_module = { 0, NULL, - nxt_string("go"), + nxt_string("external"), "*", - nxt_go_init, + nxt_external_init, }; extern char **environ; + nxt_inline nxt_int_t -nxt_go_fd_no_cloexec(nxt_task_t *task, nxt_socket_t fd) +nxt_external_fd_no_cloexec(nxt_task_t *task, nxt_socket_t fd) { int res, flags; @@ -54,20 +57,20 @@ nxt_go_fd_no_cloexec(nxt_task_t *task, nxt_socket_t fd) static nxt_int_t -nxt_go_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { - char **argv; - u_char buf[256]; - u_char *p, *end; - uint32_t index; - size_t size; - nxt_str_t str; - nxt_int_t rc; - nxt_uint_t i, argc; - nxt_port_t *my_port, *main_port; - nxt_runtime_t *rt; - nxt_conf_value_t *value; - nxt_go_app_conf_t *c; + char **argv; + u_char buf[256]; + u_char *p, *end; + uint32_t index; + size_t size; + nxt_str_t str; + nxt_int_t rc; + nxt_uint_t i, argc; + nxt_port_t *my_port, *main_port; + nxt_runtime_t *rt; + nxt_conf_value_t *value; + nxt_external_app_conf_t *c; rt = task->thread->runtime; @@ -78,12 +81,12 @@ nxt_go_init(nxt_task_t *task, nxt_common_app_conf_t *conf) return NXT_ERROR; } - rc = nxt_go_fd_no_cloexec(task, main_port->pair[1]); + rc = nxt_external_fd_no_cloexec(task, main_port->pair[1]); if (nxt_slow_path(rc != NXT_OK)) { return NXT_ERROR; } - rc = nxt_go_fd_no_cloexec(task, my_port->pair[0]); + rc = nxt_external_fd_no_cloexec(task, my_port->pair[0]); if (nxt_slow_path(rc != NXT_OK)) { return NXT_ERROR; } @@ -116,7 +119,7 @@ nxt_go_init(nxt_task_t *task, nxt_common_app_conf_t *conf) return NXT_ERROR; } - c = &conf->u.go; + c = &conf->u.external; argc = 2; size = 0; diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 3e5044b7..2194e56f 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -967,11 +967,14 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) http11 = (h1p->parser.version.s.minor != '0'); if (r->resp.content_length == NULL || r->resp.content_length->skip) { + if (http11) { - h1p->chunked = 1; - size += nxt_length(chunked); - /* Trailing CRLF will be added by the first chunk header. */ - size -= nxt_length("\r\n"); + if (n != NXT_HTTP_NOT_MODIFIED && n != NXT_HTTP_NO_CONTENT) { + h1p->chunked = 1; + size += nxt_length(chunked); + /* Trailing CRLF will be added by the first chunk header. */ + size -= nxt_length("\r\n"); + } } else { h1p->keepalive = 0; diff --git a/src/nxt_http.h b/src/nxt_http.h index 1c7ed8bb..b2111f90 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -12,6 +12,7 @@ typedef enum { NXT_HTTP_INVALID = 0, NXT_HTTP_OK = 200, + NXT_HTTP_NO_CONTENT = 204, NXT_HTTP_MULTIPLE_CHOICES = 300, NXT_HTTP_MOVED_PERMANENTLY = 301, diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index 06d45247..b80998cb 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -192,7 +192,7 @@ nxt_http_app_request(nxt_task_t *task, void *obj, void *data) ar->timer.task = &engine->task; ar->timer.work_queue = &engine->fast_work_queue; ar->timer.log = engine->task.log; - ar->timer.precision = NXT_TIMER_DEFAULT_PRECISION; + ar->timer.bias = NXT_TIMER_DEFAULT_BIAS; ar->r.remote.start = nxt_sockaddr_address(r->remote); ar->r.remote.length = r->remote->address_length; diff --git a/src/nxt_main.h b/src/nxt_main.h index 1bbd9ee5..670010f8 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -11,8 +11,8 @@ #include <nxt_auto_config.h> -#define NXT_VERSION "1.4" -#define NXT_VERNUM 10400 +#define NXT_VERSION "1.5" +#define NXT_VERNUM 10500 #define NXT_SERVER "Unit/" NXT_VERSION diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 491b09f3..819ed44c 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -134,6 +134,22 @@ static nxt_conf_map_t nxt_common_app_conf[] = { }; +static nxt_conf_map_t nxt_external_app_conf[] = { + { + nxt_string("executable"), + NXT_CONF_MAP_CSTRZ, + offsetof(nxt_common_app_conf_t, u.external.executable), + }, + + { + nxt_string("arguments"), + NXT_CONF_MAP_PTR, + offsetof(nxt_common_app_conf_t, u.external.arguments), + }, + +}; + + static nxt_conf_map_t nxt_python_app_conf[] = { { nxt_string("home"), @@ -182,22 +198,6 @@ static nxt_conf_map_t nxt_php_app_conf[] = { }; -static nxt_conf_map_t nxt_go_app_conf[] = { - { - nxt_string("executable"), - NXT_CONF_MAP_CSTRZ, - offsetof(nxt_common_app_conf_t, u.go.executable), - }, - - { - nxt_string("arguments"), - NXT_CONF_MAP_PTR, - offsetof(nxt_common_app_conf_t, u.go.arguments), - }, - -}; - - static nxt_conf_map_t nxt_perl_app_conf[] = { { nxt_string("script"), @@ -217,11 +217,11 @@ static nxt_conf_map_t nxt_ruby_app_conf[] = { static nxt_conf_app_map_t nxt_app_maps[] = { - { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, - { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, - { nxt_nitems(nxt_go_app_conf), nxt_go_app_conf }, - { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, - { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, + { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf }, + { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, + { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, + { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, + { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, }; diff --git a/src/nxt_openssl.c b/src/nxt_openssl.c index 91ba3cb0..441da54b 100644 --- a/src/nxt_openssl.c +++ b/src/nxt_openssl.c @@ -404,7 +404,7 @@ nxt_openssl_chain_file(SSL_CTX *ctx, nxt_fd_t fd) * while the main certificate needs a X509_free() call, since * its reference count is increased by SSL_CTX_use_certificate(). */ -#if OPENSSL_VERSION_NUMBER > 0x10002000L +#ifdef SSL_CTX_add0_chain_cert if (SSL_CTX_add0_chain_cert(ctx, ca) != 1) { #else if (SSL_CTX_add_extra_chain_cert(ctx, ca) != 1) { diff --git a/src/nxt_port.c b/src/nxt_port.c index 3d18fa67..30719ad3 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -276,6 +276,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_process_use(task, process, -1); + nxt_fd_nonblocking(task, msg->fd); + port->pair[0] = -1; port->pair[1] = msg->fd; port->max_size = new_port_msg->max_size; diff --git a/src/nxt_router.c b/src/nxt_router.c index 139b2c4c..7ecbca81 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -289,9 +289,9 @@ static const nxt_str_t http_prefix = nxt_string("HTTP_"); static const nxt_str_t empty_prefix = nxt_string(""); static const nxt_str_t *nxt_app_msg_prefix[] = { + &empty_prefix, &http_prefix, &http_prefix, - &empty_prefix, &http_prefix, &http_prefix, }; @@ -1578,7 +1578,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_joint->use_count = 1; app_joint->app = app; - app_joint->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION; + app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS; app_joint->idle_timer.work_queue = &engine->fast_work_queue; app_joint->idle_timer.handler = nxt_router_app_idle_timeout; app_joint->idle_timer.task = &engine->task; @@ -4119,7 +4119,7 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_assert(app->engine == engine); - threshold = engine->timers.now + app->joint->idle_timer.precision; + threshold = engine->timers.now + app->joint->idle_timer.bias; timeout = 0; nxt_thread_mutex_lock(&app->mutex); diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 311c2c08..547c7494 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -80,10 +80,10 @@ nxt_runtime_create(nxt_task_t *task) /* Should not fail. */ lang = nxt_array_add(rt->languages); - lang->type = NXT_APP_GO; + lang->type = NXT_APP_EXTERNAL; lang->version = (u_char *) ""; lang->file = NULL; - lang->module = &nxt_go_module; + lang->module = &nxt_external_module; listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); if (nxt_slow_path(listen_sockets == NULL)) { @@ -872,20 +872,6 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) continue; } - if (nxt_strcmp(p, "--upstream") == 0) { - if (*argv == NULL) { - nxt_alert(task, "no argument for option \"--upstream\""); - return NXT_ERROR; - } - - p = *argv++; - - rt->upstream.length = nxt_strlen(p); - rt->upstream.start = (u_char *) p; - - continue; - } - if (nxt_strcmp(p, "--user") == 0) { if (*argv == NULL) { write(STDERR_FILENO, no_user, nxt_length(no_user)); diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index e4c9e2a1..496ae478 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -74,7 +74,6 @@ struct nxt_runtime_s { nxt_sockaddr_t *controller_listen; nxt_listen_socket_t *controller_socket; - nxt_str_t upstream; }; diff --git a/src/nxt_timer.c b/src/nxt_timer.c index f56ddeb6..cba4755b 100644 --- a/src/nxt_timer.c +++ b/src/nxt_timer.c @@ -32,6 +32,10 @@ nxt_timers_init(nxt_timers_t *timers, nxt_uint_t mchanges) { nxt_rbtree_init(&timers->tree, nxt_timer_rbtree_compare); + if (mchanges > NXT_TIMER_MAX_CHANGES) { + mchanges = NXT_TIMER_MAX_CHANGES; + } + timers->mchanges = mchanges; timers->changes = nxt_malloc(sizeof(nxt_timer_change_t) * mchanges); @@ -71,66 +75,50 @@ nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer, time = engine->timers.now + timeout; - if (timer->state != NXT_TIMER_CHANGING) { + nxt_debug(timer->task, "timer add: %M±%d %M:%M", + timer->time, timer->bias, timeout, time); - if (nxt_timer_is_in_tree(timer)) { + timer->enabled = 1; - diff = nxt_msec_diff(time, timer->time); - /* - * Use the previous timer if difference between it and the - * new timer is less than required precision milliseconds: this - * decreases number of rbtree operations for fast connections. - */ - if (nxt_abs(diff) < timer->precision) { - nxt_debug(timer->task, "timer previous: %M:%d", - time, timer->state); + if (nxt_timer_is_in_tree(timer)) { - timer->state = NXT_TIMER_WAITING; - return; - } + diff = nxt_msec_diff(time, timer->time); + /* + * Use the previous timer if difference between it and the + * new timer is within bias: this decreases number of rbtree + * operations for fast connections. + */ + if (nxt_abs(diff) <= timer->bias) { + nxt_debug(timer->task, "timer previous: %M±%d", + time, timer->bias); + + nxt_timer_change(engine, timer, NXT_TIMER_NOPE, 0); + return; } } - nxt_debug(timer->task, "timer add: %M:%d %M:%M", - timer->time, timer->state, timeout, time); - nxt_timer_change(engine, timer, NXT_TIMER_ADD, time); } -void -nxt_timer_disable(nxt_event_engine_t *engine, nxt_timer_t *timer) -{ - nxt_debug(timer->task, "timer disable: %M:%d", timer->time, timer->state); - - if (timer->state != NXT_TIMER_CHANGING) { - timer->state = NXT_TIMER_DISABLED; - - } else { - nxt_timer_change(engine, timer, NXT_TIMER_DISABLE, 0); - } -} - - nxt_bool_t nxt_timer_delete(nxt_event_engine_t *engine, nxt_timer_t *timer) { - nxt_bool_t pending; + nxt_debug(timer->task, "timer delete: %M±%d", + timer->time, timer->bias); + + timer->enabled = 0; - if (nxt_timer_is_in_tree(timer) || timer->state == NXT_TIMER_CHANGING) { - nxt_debug(timer->task, "timer delete: %M:%d", - timer->time, timer->state); + if (nxt_timer_is_in_tree(timer)) { nxt_timer_change(engine, timer, NXT_TIMER_DELETE, 0); return 1; } - pending = (timer->state == NXT_TIMER_ENQUEUED); - - timer->state = NXT_TIMER_DISABLED; + nxt_timer_change(engine, timer, NXT_TIMER_NOPE, 0); - return pending; + return (timer->queued || timer->change != NXT_TIMER_NO_CHANGE); } @@ -143,31 +131,36 @@ nxt_timer_change(nxt_event_engine_t *engine, nxt_timer_t *timer, timers = &engine->timers; - if (timers->nchanges >= timers->mchanges) { - nxt_timer_changes_commit(engine); - } + if (timer->change == NXT_TIMER_NO_CHANGE) { + + if (change == NXT_TIMER_NOPE) { + return; + } - nxt_debug(timer->task, "timer change: %M:%d", time, change); + if (timers->nchanges >= timers->mchanges) { + nxt_timer_changes_commit(engine); + } + + timers->nchanges++; + timer->change = timers->nchanges; + } - timer->state = NXT_TIMER_CHANGING; + nxt_debug(timer->task, "timer change: %M±%d:%d", + time, timer->bias, change); - ch = &timers->changes[timers->nchanges]; + ch = &timers->changes[timer->change - 1]; ch->change = change; ch->time = time; ch->timer = timer; - - timers->nchanges++; } static void nxt_timer_changes_commit(nxt_event_engine_t *engine) { - int32_t diff; - nxt_timer_t *timer; + nxt_timer_t *timer, **add, **add_end; nxt_timers_t *timers; - nxt_timer_state_t state; nxt_timer_change_t *ch, *end; timers = &engine->timers; @@ -177,61 +170,54 @@ nxt_timer_changes_commit(nxt_event_engine_t *engine) ch = timers->changes; end = ch + timers->nchanges; - while (ch < end) { - state = NXT_TIMER_DISABLED; + add = (nxt_timer_t **) ch; + add_end = add; + while (ch < end) { timer = ch->timer; switch (ch->change) { - case NXT_TIMER_ADD: - if (nxt_timer_is_in_tree(timer)) { - - diff = nxt_msec_diff(ch->time, timer->time); - /* See the comment in nxt_timer_add(). */ + case NXT_TIMER_NOPE: + break; - if (nxt_abs(diff) < timer->precision) { - nxt_debug(timer->task, "timer rbtree previous: %M:%d", - ch->time, timer->state); + case NXT_TIMER_ADD: - state = NXT_TIMER_WAITING; - break; - } + timer->time = ch->time; - nxt_debug(timer->task, "timer rbtree delete: %M:%d", - timer->time, timer->state); + *add_end++ = timer; - nxt_rbtree_delete(&timers->tree, &timer->node); + if (!nxt_timer_is_in_tree(timer)) { + break; } - timer->time = ch->time; + /* Fall through. */ - nxt_debug(timer->task, "timer rbtree insert: %M", timer->time); + case NXT_TIMER_DELETE: + nxt_debug(timer->task, "timer rbtree delete: %M±%d", + timer->time, timer->bias); - nxt_rbtree_insert(&timers->tree, &timer->node); - nxt_timer_in_tree_set(timer); - state = NXT_TIMER_WAITING; + nxt_rbtree_delete(&timers->tree, &timer->node); + nxt_timer_in_tree_clear(timer); break; + } - case NXT_TIMER_DELETE: - if (nxt_timer_is_in_tree(timer)) { - nxt_debug(timer->task, "timer rbtree delete: %M:%d", - timer->time, timer->state); + timer->change = NXT_TIMER_NO_CHANGE; - nxt_rbtree_delete(&timers->tree, &timer->node); - nxt_timer_in_tree_clear(timer); - } + ch++; + } - break; + while (add < add_end) { + timer = *add; - case NXT_TIMER_DISABLE: - break; - } + nxt_debug(timer->task, "timer rbtree insert: %M±%d", + timer->time, timer->bias); - timer->state = state; + nxt_rbtree_insert(&timers->tree, &timer->node); + nxt_timer_in_tree_set(timer); - ch++; + add++; } timers->nchanges = 0; @@ -270,12 +256,12 @@ nxt_timer_find(nxt_event_engine_t *engine) * return much earlier and the disabled timer can be reactivated. */ - if (timer->state != NXT_TIMER_DISABLED) { + if (timer->enabled) { time = timer->time; - timers->minimum = time; + timers->minimum = time - timer->bias; - nxt_debug(timer->task, "timer found minimum: %M:%M", - time, timers->now); + nxt_debug(timer->task, "timer found minimum: %M±%d:%M", + time, timer->bias, timers->now); delta = nxt_msec_diff(time, timers->now); @@ -317,21 +303,21 @@ nxt_timer_expire(nxt_event_engine_t *engine, nxt_msec_t now) { timer = (nxt_timer_t *) node; - /* timer->time > now */ - if (nxt_msec_diff(timer->time , now) > 0) { + /* timer->time > now + timer->bias */ + if (nxt_msec_diff(timer->time , now) > (int32_t) timer->bias) { return; } next = nxt_rbtree_node_successor(tree, node); - nxt_debug(timer->task, "timer expire delete: %M:%d", - timer->time, timer->state); + nxt_debug(timer->task, "timer expire delete: %M±%d", + timer->time, timer->bias); nxt_rbtree_delete(tree, &timer->node); nxt_timer_in_tree_clear(timer); - if (timer->state != NXT_TIMER_DISABLED) { - timer->state = NXT_TIMER_ENQUEUED; + if (timer->enabled) { + timer->queued = 1; nxt_work_queue_add(timer->work_queue, nxt_timer_handler, timer->task, timer, NULL); @@ -347,8 +333,10 @@ nxt_timer_handler(nxt_task_t *task, void *obj, void *data) timer = obj; - if (timer->state == NXT_TIMER_ENQUEUED) { - timer->state = NXT_TIMER_DISABLED; + timer->queued = 0; + + if (timer->enabled && timer->change == NXT_TIMER_NO_CHANGE) { + timer->enabled = 0; timer->handler(task, timer, NULL); } diff --git a/src/nxt_timer.h b/src/nxt_timer.h index 6531657d..4199f0dd 100644 --- a/src/nxt_timer.h +++ b/src/nxt_timer.h @@ -8,25 +8,29 @@ #define _NXT_TIMER_H_INCLUDED_ -/* Valid values are between 1ms to 255ms. */ -#define NXT_TIMER_DEFAULT_PRECISION 100 -//#define NXT_TIMER_DEFAULT_PRECISION 1 +/* Valid values are between 0ms to 255ms. */ +#define NXT_TIMER_DEFAULT_BIAS 50 +//#define NXT_TIMER_DEFAULT_BIAS 0 -typedef enum { - NXT_TIMER_DISABLED = 0, - NXT_TIMER_CHANGING, - NXT_TIMER_WAITING, - NXT_TIMER_ENQUEUED, -} nxt_timer_state_t; +/* + * The nxt_timer_t structure can hold up to 14 bits of change index, + * but 0 reserved for NXT_TIMER_NO_CHANGE. + */ +#define NXT_TIMER_MAX_CHANGES 16383 +#define NXT_TIMER_NO_CHANGE 0 typedef struct { /* The rbtree node must be the first field. */ NXT_RBTREE_NODE (node); - nxt_timer_state_t state:8; - uint8_t precision; + uint8_t bias; + + uint16_t change:14; + uint16_t enabled:1; + uint16_t queued:1; + nxt_msec_t time; nxt_work_queue_t *work_queue; @@ -37,13 +41,13 @@ typedef struct { } nxt_timer_t; -#define NXT_TIMER { NXT_RBTREE_NODE_INIT, NXT_TIMER_DISABLED, \ - 0, 0, NULL, NULL, NULL, NULL } +#define NXT_TIMER { NXT_RBTREE_NODE_INIT, 0, NXT_TIMER_NO_CHANGE, \ + 0, 0, 0, NULL, NULL, NULL, NULL } typedef enum { - NXT_TIMER_ADD = 0, - NXT_TIMER_DISABLE, + NXT_TIMER_NOPE = 0, + NXT_TIMER_ADD, NXT_TIMER_DELETE, } nxt_timer_operation_t; @@ -94,10 +98,16 @@ void nxt_timer_expire(nxt_event_engine_t *engine, nxt_msec_t now); NXT_EXPORT void nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer, nxt_msec_t timeout); -NXT_EXPORT void nxt_timer_disable(nxt_event_engine_t *engine, - nxt_timer_t *timer); NXT_EXPORT nxt_bool_t nxt_timer_delete(nxt_event_engine_t *engine, nxt_timer_t *timer); +nxt_inline void +nxt_timer_disable(nxt_event_engine_t *engine, nxt_timer_t *timer) +{ + nxt_debug(timer->task, "timer disable: %M", timer->time); + + timer->enabled = 0; +} + #endif /* _NXT_TIMER_H_INCLUDED_ */ diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 0f46dc7e..0d1be557 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -510,7 +510,7 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int fd, rc; + int fd, rc, nb; pid_t pid; nxt_queue_t incoming_buf; struct cmsghdr *cm; @@ -611,6 +611,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, port_msg->stream, (int) new_port_msg->pid, (int) new_port_msg->id, fd); + nb = 0; + + if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " + "failed: %s (%d)", fd, strerror(errno), errno); + + goto fail; + } + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id); @@ -2158,9 +2167,12 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) msg.mf = 0; msg.tracking = 0; -#if (NXT_VALGRIND) + /* + * Fill all padding fields with 0. + * Code in Go 1.11 validate cmsghdr using padding field as part of len. + * See Cmsghdr definition and socketControlMessageHeaderAndData function. + */ memset(&cmsg, 0, sizeof(cmsg)); -#endif cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; @@ -2992,9 +3004,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, m.new_port.max_size = 16 * 1024; m.new_port.max_share = 64 * 1024; -#if (NXT_VALGRIND) memset(&cmsg, 0, sizeof(cmsg)); -#endif cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 8bcfb467..c068cca4 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -42,8 +42,8 @@ nxt_port_handlers_t nxt_discovery_process_port_handlers = { const nxt_sig_event_t nxt_worker_process_signals[] = { nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler), nxt_event_signal(SIGINT, nxt_worker_process_sigterm_handler), - nxt_event_signal(SIGQUIT, nxt_worker_process_sigterm_handler), - nxt_event_signal(SIGTERM, nxt_worker_process_sigquit_handler), + nxt_event_signal(SIGQUIT, nxt_worker_process_sigquit_handler), + nxt_event_signal(SIGTERM, nxt_worker_process_sigterm_handler), nxt_event_signal(SIGCHLD, nxt_worker_process_signal_handler), nxt_event_signal(SIGUSR1, nxt_worker_process_signal_handler), nxt_event_signal(SIGUSR2, nxt_worker_process_signal_handler), |