diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nodejs/unit-http/http_server.js | 50 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 271 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.h | 6 | ||||
-rw-r--r-- | src/nxt_app_queue.h | 18 | ||||
-rw-r--r-- | src/nxt_application.h | 14 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 35 | ||||
-rw-r--r-- | src/nxt_controller.c | 37 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 39 | ||||
-rw-r--r-- | src/nxt_http.h | 1 | ||||
-rw-r--r-- | src/nxt_http_route.c | 6 | ||||
-rw-r--r-- | src/nxt_http_static.c | 23 | ||||
-rw-r--r-- | src/nxt_isolation.c | 6 | ||||
-rw-r--r-- | src/nxt_main_process.c | 94 | ||||
-rw-r--r-- | src/nxt_php_sapi.c | 11 | ||||
-rw-r--r-- | src/nxt_port.c | 8 | ||||
-rw-r--r-- | src/nxt_process.h | 2 | ||||
-rw-r--r-- | src/nxt_router.c | 83 | ||||
-rw-r--r-- | src/nxt_runtime.c | 2 | ||||
-rw-r--r-- | src/nxt_unit.c | 68 | ||||
-rw-r--r-- | src/nxt_unit_request.h | 1 | ||||
-rw-r--r-- | src/python/nxt_python.c | 103 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.c | 41 | ||||
-rw-r--r-- | src/python/nxt_python_wsgi.c | 65 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.c | 14 |
24 files changed, 713 insertions, 285 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index d378e410..e59296ae 100644 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -11,6 +11,7 @@ const util = require('util'); const unit_lib = require('./build/Release/unit-http'); const Socket = require('./socket'); const WebSocketFrame = require('./websocket_frame'); +const Readable = require('stream').Readable; function ServerResponse(req) { @@ -23,6 +24,7 @@ function ServerResponse(req) { req._response = this; this.socket = req.socket; this.connection = req.connection; + this.writable = true; } util.inherits(ServerResponse, EventEmitter); @@ -268,6 +270,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { res = this._write(chunk, 0, contentLength); if (res < contentLength) { this.socket.writable = false; + this.writable = false; o = new BufferedOutput(this, res, chunk, encoding, callback); this.server._output.push(o); @@ -328,6 +331,8 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) { if (typeof callback === 'function') { callback(); } + + this.emit("finish"); }); this.finished = true; @@ -337,15 +342,14 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) { }; function ServerRequest(server, socket) { - EventEmitter.call(this); + Readable.call(this); this.server = server; this.socket = socket; this.connection = socket; + this._pushed_eofchunk = false; } -util.inherits(ServerRequest, EventEmitter); - -ServerRequest.prototype.unpipe = undefined; +util.inherits(ServerRequest, Readable); ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) { this.timeout = msecs; @@ -377,35 +381,21 @@ ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() { return http.STATUS_CODES; }; -ServerRequest.prototype.listeners = function listeners() { - return []; -}; - -ServerRequest.prototype.resume = function resume() { - return []; -}; +ServerRequest.prototype._request_read = unit_lib.request_read; -/* - * The "on" method is overridden to defer reading data until user code is - * ready, that is (ev === "data"). This can occur after req.emit("end") is - * executed, since the user code can be scheduled asynchronously by Promises - * and so on. Passing the data is postponed by process.nextTick() until - * the "on" method caller completes. - */ -ServerRequest.prototype.on = function on(ev, fn) { - Server.prototype.on.call(this, ev, fn); +ServerRequest.prototype._read = function _read(n) { + const b = this._request_read(n); - if (ev === "data") { - process.nextTick(function () { - if (this._data.length !== 0) { - this.emit("data", this._data); - } + if (b != null) { + this.push(b); + } - }.bind(this)); + if (!this._pushed_eofchunk && (b == null || b.length < n)) { + this._pushed_eofchunk = true; + this.push(null); } }; -ServerRequest.prototype.addListener = ServerRequest.prototype.on; function Server(requestListener) { EventEmitter.call(this); @@ -472,11 +462,6 @@ Server.prototype.emit_request = function (req, res) { } else { this.emit("request", req, res); } - - process.nextTick(() => { - req.emit("finish"); - req.emit("end"); - }); }; Server.prototype.emit_close = function () { @@ -523,6 +508,7 @@ Server.prototype.emit_drain = function () { } resp.socket.writable = true; + resp.writable = true; process.nextTick(() => { resp.emit("drain"); diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index c5bca49a..589eca3f 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -13,25 +13,140 @@ #include <nxt_unit_websocket.h> -static void delete_port_data(uv_handle_t* handle); - napi_ref Unit::constructor_; struct port_data_t { - nxt_unit_ctx_t *ctx; - nxt_unit_port_t *port; - uv_poll_t poll; + port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p); + + void process_port_msg(); + void stop(); + + template<typename T> + static port_data_t *get(T *handle); + + static void read_callback(uv_poll_t *handle, int status, int events); + static void timer_callback(uv_timer_t *handle); + static void delete_data(uv_handle_t* handle); + + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + uv_poll_t poll; + uv_timer_t timer; + int ref_count; + bool scheduled; + bool stopped; }; struct req_data_t { napi_ref sock_ref; + napi_ref req_ref; napi_ref resp_ref; napi_ref conn_ref; }; +port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) : + ctx(c), port(p), ref_count(0), scheduled(false), stopped(false) +{ + timer.type = UV_UNKNOWN_HANDLE; +} + + +void +port_data_t::process_port_msg() +{ + int rc, err; + + rc = nxt_unit_process_port_msg(ctx, port); + + if (rc != NXT_UNIT_OK) { + return; + } + + if (timer.type == UV_UNKNOWN_HANDLE) { + err = uv_timer_init(poll.loop, &timer); + if (err < 0) { + nxt_unit_warn(ctx, "Failed to init uv.poll"); + return; + } + + ref_count++; + timer.data = this; + } + + if (!scheduled && !stopped) { + uv_timer_start(&timer, timer_callback, 0, 0); + + scheduled = true; + } +} + + +void +port_data_t::stop() +{ + stopped = true; + + uv_poll_stop(&poll); + + uv_close((uv_handle_t *) &poll, delete_data); + + if (timer.type == UV_UNKNOWN_HANDLE) { + return; + } + + uv_timer_stop(&timer); + + uv_close((uv_handle_t *) &timer, delete_data); +} + + +template<typename T> +port_data_t * +port_data_t::get(T *handle) +{ + return (port_data_t *) handle->data; +} + + +void +port_data_t::read_callback(uv_poll_t *handle, int status, int events) +{ + get(handle)->process_port_msg(); +} + + +void +port_data_t::timer_callback(uv_timer_t *handle) +{ + port_data_t *data; + + data = get(handle); + + data->scheduled = false; + if (data->stopped) { + return; + } + + data->process_port_msg(); +} + + +void +port_data_t::delete_data(uv_handle_t* handle) +{ + port_data_t *data; + + data = get(handle); + + if (--data->ref_count <= 0) { + delete data; + } +} + + Unit::Unit(napi_env env, napi_value jsthis): nxt_napi(env), wrapper_(wrap(jsthis, this, destroy)), @@ -65,6 +180,7 @@ Unit::init(napi_env env, napi_value exports) constructor_ = napi.create_reference(ctor); napi.set_named_property(exports, "Unit", ctor); + napi.set_named_property(exports, "request_read", request_read); napi.set_named_property(exports, "response_send_headers", response_send_headers); napi.set_named_property(exports, "response_write", response_write); @@ -206,7 +322,7 @@ Unit::request_handler(nxt_unit_request_info_t *req) server_obj = get_server_object(); socket = create_socket(server_obj, req); - request = create_request(server_obj, socket); + request = create_request(server_obj, socket, req); response = create_response(server_obj, request, req); create_headers(req, request); @@ -301,6 +417,7 @@ Unit::close_handler(nxt_unit_request_info_t *req) nxt_napi::create(0)); remove_wrap(req_data->sock_ref); + remove_wrap(req_data->req_ref); remove_wrap(req_data->resp_ref); remove_wrap(req_data->conn_ref); @@ -350,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) } -static void -nxt_uv_read_callback(uv_poll_t *handle, int status, int events) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - nxt_unit_process_port_msg(data->ctx, data->port); -} - - int Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int err; - Unit *obj; - uv_loop_t *loop; - port_data_t *data; - napi_status status; + int err; + Unit *obj; + uv_loop_t *loop; + port_data_t *data; + napi_status status; if (port->in_fd != -1) { - obj = reinterpret_cast<Unit *>(ctx->unit->data); - if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", port->in_fd, strerror(errno), errno); return -1; } + obj = reinterpret_cast<Unit *>(ctx->unit->data); + status = napi_get_uv_event_loop(obj->env(), &loop); if (status != napi_ok) { nxt_unit_warn(ctx, "Failed to get uv.loop"); return NXT_UNIT_ERROR; } - data = new port_data_t; + data = new port_data_t(ctx, port); err = uv_poll_init(loop, &data->poll, port->in_fd); if (err < 0) { nxt_unit_warn(ctx, "Failed to init uv.poll"); + delete data; return NXT_UNIT_ERROR; } - err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); + err = uv_poll_start(&data->poll, UV_READABLE, + port_data_t::read_callback); if (err < 0) { nxt_unit_warn(ctx, "Failed to start uv.poll"); + delete data; return NXT_UNIT_ERROR; } port->data = data; - data->ctx = ctx; - data->port = port; + data->ref_count++; data->poll.data = data; } @@ -418,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) if (port->data != NULL) { data = (port_data_t *) port->data; - if (data->port == port) { - uv_poll_stop(&data->poll); - - uv_close((uv_handle_t *) &data->poll, delete_port_data); - } + data->stop(); } } -static void -delete_port_data(uv_handle_t* handle) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - delete data; -} - - void Unit::quit_cb(nxt_unit_ctx_t *ctx) { @@ -488,9 +581,8 @@ Unit::get_server_object() void Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) { - void *data; uint32_t i; - napi_value headers, raw_headers, buffer; + napi_value headers, raw_headers; napi_status status; nxt_unit_request_t *r; @@ -515,11 +607,6 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) set_named_property(request, "url", r->target, r->target_length); set_named_property(request, "_websocket_handshake", r->websocket_handshake); - - buffer = create_buffer((size_t) req->content_length, &data); - nxt_unit_request_read(req, data, req->content_length); - - set_named_property(request, "_data", buffer); } @@ -577,13 +664,20 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) napi_value -Unit::create_request(napi_value server_obj, napi_value socket) +Unit::create_request(napi_value server_obj, napi_value socket, + nxt_unit_request_info_t *req) { - napi_value constructor; + napi_value constructor, res; + req_data_t *req_data; constructor = get_named_property(server_obj, "ServerRequest"); - return new_instance(constructor, server_obj, socket); + res = new_instance(constructor, server_obj, socket); + + req_data = (req_data_t *) req->data; + req_data->req_ref = wrap(res, req, req_destroy); + + return res; } @@ -643,6 +737,47 @@ Unit::create_websocket_frame(napi_value server_obj, napi_value +Unit::request_read(napi_env env, napi_callback_info info) +{ + void *data; + uint32_t wm; + nxt_napi napi(env); + napi_value this_arg, argv, buffer; + nxt_unit_request_info_t *req; + + try { + this_arg = napi.get_cb_info(info, argv); + + try { + req = napi.get_request_info(this_arg); + + } catch (exception &e) { + return nullptr; + } + + if (req->content_length == 0) { + return nullptr; + } + + wm = napi.get_value_uint32(argv); + + if (wm > req->content_length) { + wm = req->content_length; + } + + buffer = napi.create_buffer((size_t) wm, &data); + nxt_unit_request_read(req, data, wm); + + } catch (exception &e) { + napi.throw_error(e); + return nullptr; + } + + return buffer; +} + + +napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { int ret; @@ -884,6 +1019,7 @@ Unit::response_end(napi_env env, napi_callback_info info) req_data = (req_data_t *) req->data; napi.remove_wrap(req_data->sock_ref); + napi.remove_wrap(req_data->req_ref); napi.remove_wrap(req_data->resp_ref); napi.remove_wrap(req_data->conn_ref); @@ -998,33 +1134,28 @@ Unit::websocket_set_sock(napi_env env, napi_callback_info info) void -Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; - - req = (nxt_unit_request_info_t *) nativeObject; - - nxt_unit_warn(NULL, "conn_destroy: %p", req); + nxt_unit_req_debug(NULL, "conn_destroy: %p", r); } void -Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; - - req = (nxt_unit_request_info_t *) nativeObject; - - nxt_unit_warn(NULL, "sock_destroy: %p", req); + nxt_unit_req_debug(NULL, "sock_destroy: %p", r); } void -Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::req_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; + nxt_unit_req_debug(NULL, "req_destroy: %p", r); +} - req = (nxt_unit_request_info_t *) nativeObject; - nxt_unit_warn(NULL, "resp_destroy: %p", req); +void +Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) +{ + nxt_unit_req_debug(NULL, "resp_destroy: %p", r); } diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 07823c26..4ef40d45 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -21,6 +21,7 @@ private: static void destroy(napi_env env, void *nativeObject, void *finalize_hint); static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint); static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint); + static void req_destroy(napi_env env, void *nativeObject, void *finalize_hint); static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint); static napi_value create_server(napi_env env, napi_callback_info info); @@ -50,7 +51,8 @@ private: napi_value create_socket(napi_value server_obj, nxt_unit_request_info_t *req); - napi_value create_request(napi_value server_obj, napi_value socket); + napi_value create_request(napi_value server_obj, napi_value socket, + nxt_unit_request_info_t *req); napi_value create_response(napi_value server_obj, napi_value request, nxt_unit_request_info_t *req); @@ -58,6 +60,8 @@ private: napi_value create_websocket_frame(napi_value server_obj, nxt_unit_websocket_frame_t *ws); + static napi_value request_read(napi_env env, napi_callback_info info); + static napi_value response_send_headers(napi_env env, napi_callback_info info); diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h index 127cb8f3..a1cc2f11 100644 --- a/src/nxt_app_queue.h +++ b/src/nxt_app_queue.h @@ -23,7 +23,7 @@ typedef struct { typedef struct { - nxt_app_nncq_atomic_t nitems; + nxt_app_nncq_atomic_t notified; nxt_app_nncq_t free_items; nxt_app_nncq_t queue; nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE]; @@ -42,7 +42,7 @@ nxt_app_queue_init(nxt_app_queue_t volatile *q) nxt_app_nncq_enqueue(&q->free_items, i); } - q->nitems = 0; + q->notified = 0; } @@ -50,6 +50,7 @@ nxt_inline nxt_int_t nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p, uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie) { + int n; nxt_app_queue_item_t *qi; nxt_app_nncq_atomic_t i; @@ -67,16 +68,23 @@ nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p, nxt_app_nncq_enqueue(&q->queue, i); - i = nxt_atomic_fetch_add(&q->nitems, 1); + n = nxt_atomic_cmp_set(&q->notified, 0, 1); if (notify != NULL) { - *notify = (i == 0); + *notify = n; } return NXT_OK; } +nxt_inline void +nxt_app_queue_notification_received(nxt_app_queue_t volatile *q) +{ + q->notified = 0; +} + + nxt_inline nxt_bool_t nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie, uint32_t tracking) @@ -110,8 +118,6 @@ nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie) nxt_app_nncq_enqueue(&q->free_items, i); - nxt_atomic_fetch_add(&q->nitems, -1); - return res; } diff --git a/src/nxt_application.h b/src/nxt_application.h index 5632f56f..632c5632 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -47,13 +47,13 @@ typedef struct { typedef struct { - char *home; - nxt_str_t path; - nxt_str_t module; - char *callable; - nxt_str_t protocol; - uint32_t threads; - uint32_t thread_stack_size; + char *home; + nxt_conf_value_t *path; + nxt_str_t module; + char *callable; + nxt_str_t protocol; + uint32_t threads; + uint32_t thread_stack_size; } nxt_python_app_conf_t; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index acb2e3de..67fa3095 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -96,6 +96,10 @@ static nxt_int_t nxt_conf_vldt_return(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_python_path(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_python_path_element(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_threads(nxt_conf_validation_t *vldt, @@ -491,7 +495,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = { .type = NXT_CONF_VLDT_STRING, }, { .name = nxt_string("path"), - .type = NXT_CONF_VLDT_STRING, + .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + .validator = nxt_conf_vldt_python_path, }, { .name = nxt_string("module"), .type = NXT_CONF_VLDT_STRING, @@ -1377,6 +1382,34 @@ nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, static nxt_int_t +nxt_conf_vldt_python_path(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) +{ + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { + return nxt_conf_vldt_array_iterator(vldt, value, + &nxt_conf_vldt_python_path_element); + } + + /* NXT_CONF_STRING */ + + return NXT_OK; +} + + +static nxt_int_t +nxt_conf_vldt_python_path_element(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value) +{ + if (nxt_conf_type(value) != NXT_CONF_STRING) { + return nxt_conf_vldt_error(vldt, "The \"path\" array must contain " + "only string values."); + } + + return NXT_OK; +} + + +static nxt_int_t nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 9a34a877..772d10c8 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -1686,7 +1686,10 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, static void nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) { + void *mem; + u_char *end; size_t size; + nxt_fd_t fd; nxt_buf_t *b; nxt_port_t *main_port; nxt_runtime_t *rt; @@ -1697,14 +1700,38 @@ nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) size = nxt_conf_json_length(conf, NULL); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); + fd = nxt_shm_open(task, size); + if (nxt_slow_path(fd == -1)) { + return; + } + + mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + goto fail; + } + + end = nxt_conf_json_print(mem, conf, NULL); + + nxt_mem_munmap(mem, size); - if (nxt_fast_path(b != NULL)) { - b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); + size = end - (u_char *) mem; - (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE, - -1, 0, -1, b); + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, sizeof(size_t), 0); + if (nxt_slow_path(b == NULL)) { + goto fail; } + + b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t)); + + (void) nxt_port_socket_write(task, main_port, + NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_CLOSE_FD, + fd, 0, -1, b); + + return; + +fail: + + nxt_fd_close(fd); } diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index dccbe56c..d3da6942 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -174,6 +174,8 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Content-Type"), &nxt_http_request_field, offsetof(nxt_http_request_t, content_type) }, { nxt_string("Content-Length"), &nxt_http_request_content_length, 0 }, + { nxt_string("Authorization"), &nxt_http_request_field, + offsetof(nxt_http_request_t, authorization) }, }; @@ -1151,19 +1153,19 @@ static const nxt_str_t nxt_http_client_error[] = { nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"), nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"), nxt_string("HTTP/1.1 417 Expectation Failed\r\n"), - nxt_string("HTTP/1.1 418\r\n"), - nxt_string("HTTP/1.1 419\r\n"), - nxt_string("HTTP/1.1 420\r\n"), - nxt_string("HTTP/1.1 421\r\n"), - nxt_string("HTTP/1.1 422\r\n"), - nxt_string("HTTP/1.1 423\r\n"), - nxt_string("HTTP/1.1 424\r\n"), - nxt_string("HTTP/1.1 425\r\n"), + nxt_string("HTTP/1.1 418 I'm a teapot\r\n"), + nxt_string("HTTP/1.1 419 \r\n"), + nxt_string("HTTP/1.1 420 \r\n"), + nxt_string("HTTP/1.1 421 Misdirected Request\r\n"), + nxt_string("HTTP/1.1 422 Unprocessable Entity\r\n"), + nxt_string("HTTP/1.1 423 Locked\r\n"), + nxt_string("HTTP/1.1 424 Failed Dependency\r\n"), + nxt_string("HTTP/1.1 425 \r\n"), nxt_string("HTTP/1.1 426 Upgrade Required\r\n"), - nxt_string("HTTP/1.1 427\r\n"), - nxt_string("HTTP/1.1 428\r\n"), - nxt_string("HTTP/1.1 429\r\n"), - nxt_string("HTTP/1.1 430\r\n"), + nxt_string("HTTP/1.1 427 \r\n"), + nxt_string("HTTP/1.1 428 \r\n"), + nxt_string("HTTP/1.1 429 \r\n"), + nxt_string("HTTP/1.1 430 \r\n"), nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"), }; @@ -1190,7 +1192,7 @@ static const nxt_str_t nxt_http_server_error[] = { }; -#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 65536\r\n") +#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 999 \r\n") static void nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, @@ -1248,13 +1250,16 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, { status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR]; - } else { - p = nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH, - "HTTP/1.1 %03d\r\n", n); + } else if (n <= NXT_HTTP_STATUS_MAX) { + (void) nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH, + "HTTP/1.1 %03d \r\n", n); - unknown_status.length = p - buf; + unknown_status.length = UNKNOWN_STATUS_LENGTH; unknown_status.start = buf; status = &unknown_status; + + } else { + status = &nxt_http_server_error[0]; } size = status->length; diff --git a/src/nxt_http.h b/src/nxt_http.h index 1418be95..e30bfeb4 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -156,6 +156,7 @@ struct nxt_http_request_s { nxt_http_field_t *cookie; nxt_http_field_t *referer; nxt_http_field_t *user_agent; + nxt_http_field_t *authorization; nxt_off_t content_length_n; nxt_sockaddr_t *remote; diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index 9aaa708e..28545fc9 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -813,6 +813,12 @@ nxt_http_route_ruleset_create(nxt_task_t *task, nxt_mp_t *mp, next = 0; + /* + * A workaround for GCC 10 with -flto -O2 flags that warns about "name" + * may be uninitialized in nxt_http_route_rule_name_create(). + */ + nxt_str_null(&name); + for (i = 0; i < n; i++) { rule_cv = nxt_conf_next_object_member(ruleset_cv, &name, &next); diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index 5687ef2c..df2655fc 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -395,12 +395,15 @@ static void nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data) { ssize_t n, size; - nxt_buf_t *b, *fb; + nxt_buf_t *b, *fb, *next; nxt_off_t rest; nxt_http_request_t *r; b = obj; r = data; + +complete_buf: + fb = r->out; if (nxt_slow_path(fb == NULL || r->error)) { @@ -424,6 +427,8 @@ nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data) goto clean; } + next = b->next; + if (n == rest) { nxt_file_close(task, fb->file); r->out = NULL; @@ -439,12 +444,24 @@ nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data) b->mem.free = b->mem.pos + n; nxt_http_request_send(task, r, b); + + if (next != NULL) { + b = next; + goto complete_buf; + } + return; clean: - nxt_mp_free(r->mem_pool, b); - nxt_mp_release(r->mem_pool); + do { + next = b->next; + + nxt_mp_free(r->mem_pool, b); + nxt_mp_release(r->mem_pool); + + b = next; + } while (b != NULL); if (fb != NULL) { nxt_file_close(task, fb->file); diff --git a/src/nxt_isolation.c b/src/nxt_isolation.c index 1e6323bc..cab0074b 100644 --- a/src/nxt_isolation.c +++ b/src/nxt_isolation.c @@ -676,12 +676,6 @@ nxt_isolation_unmount_all(nxt_task_t *task, nxt_process_t *process) return; } -#if (NXT_HAVE_CLONE_NEWNS) - if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS)) { - return; - } -#endif - nxt_debug(task, "unmount all (%s)", process->name); automount = &process->isolation.automount; diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 0cde435b..f20f2c2c 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -182,7 +182,7 @@ static nxt_conf_map_t nxt_python_app_conf[] = { { nxt_string("path"), - NXT_CONF_MAP_STR, + NXT_CONF_MAP_PTR, offsetof(nxt_common_app_conf_t, u.python.path), }, @@ -1001,21 +1001,22 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_listening_socket_t ls; u_char message[2048]; + port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + return; + } + b = msg->buf; sa = (nxt_sockaddr_t *) b->mem.pos; /* TODO check b size and make plain */ - out = NULL; - ls.socket = -1; ls.error = NXT_SOCKET_ERROR_SYSTEM; ls.start = message; ls.end = message + sizeof(message); - port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, - msg->port_msg.reply_port); - nxt_debug(task, "listening socket \"%*s\"", (size_t) sa->length, nxt_sockaddr_start(sa)); @@ -1025,6 +1026,8 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "socket(\"%*s\"): %d", (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); + out = NULL; + type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; } else { @@ -1034,13 +1037,11 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size + 1); - if (nxt_slow_path(out == NULL)) { - return; - } + if (nxt_fast_path(out != NULL)) { + *out->mem.free++ = (uint8_t) ls.error; - *out->mem.free++ = (uint8_t) ls.error; - - out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); + out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); + } type = NXT_PORT_MSG_RPC_ERROR; } @@ -1408,12 +1409,45 @@ nxt_app_lang_compare(const void *v1, const void *v2) static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - ssize_t n, size, offset; - nxt_buf_t *b; + void *p; + size_t size; + ssize_t n; nxt_int_t ret; nxt_file_t file; nxt_runtime_t *rt; + p = MAP_FAILED; + + /* + * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be + * initialized in 'cleanup' section. + */ + size = 0; + + if (nxt_slow_path(msg->fd[0] == -1)) { + nxt_alert(task, "conf_store_handler: invalid shm fd"); + goto error; + } + + if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { + nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)", + (int) nxt_buf_mem_used_size(&msg->buf->mem)); + goto error; + } + + nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); + + p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); + + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; + + if (nxt_slow_path(p == MAP_FAILED)) { + goto error; + } + + nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); + nxt_memzero(&file, sizeof(nxt_file_t)); rt = task->thread->runtime; @@ -1427,33 +1461,35 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto error; } - offset = 0; - - for (b = msg->buf; b != NULL; b = b->next) { - size = nxt_buf_mem_used_size(&b->mem); - - n = nxt_file_write(&file, b->mem.pos, size, offset); + n = nxt_file_write(&file, p, size, 0); - if (nxt_slow_path(n != size)) { - nxt_file_close(task, &file); - (void) nxt_file_delete(file.name); - goto error; - } + nxt_file_close(task, &file); - offset += n; + if (nxt_slow_path(n != (ssize_t) size)) { + (void) nxt_file_delete(file.name); + goto error; } - nxt_file_close(task, &file); - ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); if (nxt_fast_path(ret == NXT_OK)) { - return; + goto cleanup; } error: nxt_alert(task, "failed to store current configuration"); + +cleanup: + + if (p != MAP_FAILED) { + nxt_mem_munmap(p, size); + } + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; + } } diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index d2fbdd27..369e7f32 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -1038,6 +1038,17 @@ nxt_php_execute(nxt_php_run_ctx_t *ctx, nxt_unit_request_t *r) ctx->cookie = nxt_unit_sptr_get(&f->value); } + if (r->authorization_field != NXT_UNIT_NONE_FIELD) { + f = r->fields + r->authorization_field; + + php_handle_auth_data(nxt_unit_sptr_get(&f->value)); + + } else { + SG(request_info).auth_digest = NULL; + SG(request_info).auth_user = NULL; + SG(request_info).auth_password = NULL; + } + SG(sapi_headers).http_response_code = 200; SG(request_info).path_translated = NULL; diff --git a/src/nxt_port.c b/src/nxt_port.c index dbcdec11..d4e46564 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -8,6 +8,7 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_router.h> +#include <nxt_app_queue.h> #include <nxt_port_queue.h> @@ -84,6 +85,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, void nxt_port_close(nxt_task_t *task, nxt_port_t *port) { + size_t size; + nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, port->id, port->type); @@ -109,7 +112,10 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port) } if (port->queue != NULL) { - nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t)); + size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t) + : sizeof(nxt_port_queue_t); + nxt_mem_munmap(port->queue, size); + port->queue = NULL; } } diff --git a/src/nxt_process.h b/src/nxt_process.h index 7afb8803..4f24b179 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -107,8 +107,6 @@ struct nxt_process_s { nxt_port_mmaps_t incoming; - nxt_thread_mutex_t cp_mutex; - uint32_t stream; nxt_mp_t *mem_pool; diff --git a/src/nxt_router.c b/src/nxt_router.c index 9dd5c30e..03fe2a6c 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -699,26 +699,39 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void *p; size_t size; nxt_int_t ret; + nxt_port_t *port; nxt_router_temp_conf_t *tmcf; + port = nxt_runtime_port_find(task->thread->runtime, + msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "conf_data_handler: reply port not found"); + return; + } + + p = MAP_FAILED; + + /* + * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be + * initialized in 'cleanup' section. + */ + size = 0; + tmcf = nxt_router_temp_conf(task); if (nxt_slow_path(tmcf == NULL)) { - return; + goto fail; } if (nxt_slow_path(msg->fd[0] == -1)) { - nxt_alert(task, "conf_data_handler: invalid file shm fd"); - return; + nxt_alert(task, "conf_data_handler: invalid shm fd"); + goto fail; } if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", (int) nxt_buf_mem_used_size(&msg->buf->mem)); - - nxt_fd_close(msg->fd[0]); - msg->fd[0] = -1; - - return; + goto fail; } nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); @@ -729,22 +742,14 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->fd[0] = -1; if (nxt_slow_path(p == MAP_FAILED)) { - return; + goto fail; } nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); tmcf->router_conf->router = nxt_router; tmcf->stream = msg->port_msg.stream; - tmcf->port = nxt_runtime_port_find(task->thread->runtime, - msg->port_msg.pid, - msg->port_msg.reply_port); - - if (nxt_slow_path(tmcf->port == NULL)) { - nxt_alert(task, "reply port not found"); - - goto fail; - } + tmcf->port = port; nxt_port_use(task, tmcf->port, 1); @@ -757,9 +762,27 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_router_conf_error(task, tmcf); } + goto cleanup; + fail: - nxt_mem_munmap(p, size); + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, + msg->port_msg.stream, 0, NULL); + + if (tmcf != NULL) { + nxt_mp_destroy(tmcf->mem_pool); + } + +cleanup: + + if (p != MAP_FAILED) { + nxt_mem_munmap(p, size); + } + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; + } } @@ -1586,6 +1609,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, ret = nxt_router_app_queue_init(task, port); if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_write_close(port); + nxt_port_read_close(port); nxt_port_use(task, port, -1); return NXT_ERROR; } @@ -3771,7 +3796,10 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, nxt_http_buf_last(r)); req_rpc_data->rpc_cancel = 0; - req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + + if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { + req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + } nxt_request_rpc_data_unlink(task, req_rpc_data); @@ -5169,6 +5197,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, req->content_length_field = NXT_UNIT_NONE_FIELD; req->content_type_field = NXT_UNIT_NONE_FIELD; req->cookie_field = NXT_UNIT_NONE_FIELD; + req->authorization_field = NXT_UNIT_NONE_FIELD; dst_field = req->fields; @@ -5193,6 +5222,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, } else if (field == r->cookie) { req->cookie_field = dst_field - req->fields; + + } else if (field == r->authorization) { + req->authorization_field = dst_field - req->fields; } nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", @@ -5369,7 +5401,7 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_bool_t ack; nxt_process_t *process; nxt_free_map_t *m; - nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "oosm in %PI", msg->port_msg.pid); @@ -5390,8 +5422,13 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_thread_mutex_lock(&process->incoming.mutex); for (i = 0; i < process->incoming.size; i++) { - hdr = process->incoming.elts[i].mmap_handler->hdr; - m = hdr->free_map; + mmap_handler = process->incoming.elts[i].mmap_handler; + + if (nxt_slow_path(mmap_handler == NULL)) { + continue; + } + + m = mmap_handler->hdr->free_map; for (mi = 0; mi < MAX_FREE_IDX; mi++) { if (m[mi] != 0) { diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index d9d986da..8a86d38a 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1387,7 +1387,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_queue_init(&process->ports); nxt_thread_mutex_create(&process->incoming.mutex); - nxt_thread_mutex_create(&process->cp_mutex); process->use_count = 1; @@ -1408,7 +1407,6 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_mmaps_destroy(&process->incoming, 1); nxt_thread_mutex_destroy(&process->incoming.mutex); - nxt_thread_mutex_destroy(&process->cp_mutex); /* processes from nxt_runtime_process_get() have no memory pool */ if (process->mem_pool != NULL) { diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 097f50d6..2fef17c5 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -784,8 +784,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, { int rc; int ready_fd, router_fd, read_in_fd, read_out_fd; - char *unit_init, *version_end; - long version_length; + char *unit_init, *version_end, *vars; + size_t version_length; int64_t ready_pid, router_pid, read_pid; uint32_t ready_stream, router_id, ready_id, read_id; @@ -797,21 +797,30 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); + version_end = strchr(unit_init, ';'); + if (nxt_slow_path(version_end == NULL)) { + nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"", + NXT_UNIT_INIT_ENV, unit_init); - version_length = nxt_length(NXT_VERSION); + return NXT_UNIT_ERROR; + } - version_end = strchr(unit_init, ';'); - if (version_end == NULL - || version_end - unit_init != version_length - || memcmp(unit_init, NXT_VERSION, version_length) != 0) - { - nxt_unit_alert(NULL, "version check error"); + version_length = version_end - unit_init; + + rc = version_length != nxt_length(NXT_VERSION) + || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION)); + + if (nxt_slow_path(rc != 0)) { + nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version " + "%.*s, while the app was compiled with libunit %s", + (int) version_length, unit_init, NXT_VERSION); return NXT_UNIT_ERROR; } - rc = sscanf(version_end + 1, + vars = version_end + 1; + + rc = sscanf(vars, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" @@ -823,12 +832,22 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, &read_pid, &read_id, &read_in_fd, &read_out_fd, log_fd, shm_limit); + if (nxt_slow_path(rc == EOF)) { + nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", + vars, strerror(errno), errno, NXT_UNIT_INIT_ENV); + + return NXT_UNIT_ERROR; + } + if (nxt_slow_path(rc != 13)) { - nxt_unit_alert(NULL, "failed to scan variables: %d", rc); + nxt_unit_alert(NULL, "invalid number of variables in %s env: " + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars); return NXT_UNIT_ERROR; } + nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); + nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); ready_port->in_fd = -1; @@ -3587,7 +3606,10 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + do { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + } while (res == NXT_UNIT_AGAIN); + if (res == NXT_UNIT_ERROR) { nxt_unit_read_buf_release(ctx, rbuf); @@ -4994,7 +5016,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { @@ -5002,9 +5023,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - -retry: if (port == lib->shared_port) { rc = nxt_unit_shared_port_recv(ctx, port, rbuf); @@ -5030,15 +5048,6 @@ retry: nxt_unit_process_ready_req(ctx); - if (ctx_impl->online) { - rbuf = nxt_unit_read_buf_get(ctx); - if (nxt_slow_path(rbuf == NULL)) { - return NXT_UNIT_ERROR; - } - - goto retry; - } - return rc; } @@ -6073,7 +6082,10 @@ static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { - int res; + int res; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); retry: @@ -6086,6 +6098,8 @@ retry: } if (nxt_unit_is_read_queue(rbuf)) { + nxt_app_queue_notification_received(port_impl->queue); + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", (int) port->id.pid, (int) port->id.id, (int) rbuf->size); diff --git a/src/nxt_unit_request.h b/src/nxt_unit_request.h index fede00d2..5dbf648d 100644 --- a/src/nxt_unit_request.h +++ b/src/nxt_unit_request.h @@ -31,6 +31,7 @@ struct nxt_unit_request_s { uint32_t content_length_field; uint32_t content_type_field; uint32_t cookie_field; + uint32_t authorization_field; uint64_t content_length; diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c index faf0c0e1..d8204937 100644 --- a/src/python/nxt_python.c +++ b/src/python/nxt_python.c @@ -24,6 +24,7 @@ typedef struct { static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data); +static nxt_int_t nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value); static int nxt_python_init_threads(nxt_python_app_conf_t *c); static int nxt_python_ready_handler(nxt_unit_ctx_t *ctx); static void *nxt_python_thread_func(void *main_ctx); @@ -67,7 +68,7 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) int rc; char *nxt_py_module; size_t len; - PyObject *obj, *pypath, *module; + PyObject *obj, *module; nxt_str_t proto; const char *callable; nxt_unit_ctx_t *unit_ctx; @@ -162,38 +163,18 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) } nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush"); + + /* obj is a Borrowed reference. */ + obj = NULL; + if (nxt_slow_path(nxt_py_stderr_flush == NULL)) { nxt_alert(task, "Python failed to get \"flush\" attribute of " "\"sys.stderr\" object"); goto fail; } - /* obj is a Borrowed reference. */ - - if (c->path.length > 0) { - obj = PyString_FromStringAndSize((char *) c->path.start, - c->path.length); - - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to create string object \"%V\"", - &c->path); - goto fail; - } - - pypath = PySys_GetObject((char *) "path"); - - if (nxt_slow_path(pypath == NULL)) { - nxt_alert(task, "Python failed to get \"sys.path\" list"); - goto fail; - } - - if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { - nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"", - &c->path); - goto fail; - } - - Py_DECREF(obj); + if (nxt_slow_path(nxt_python_set_path(task, c->path) != NXT_OK)) { + goto fail; } obj = Py_BuildValue("[s]", "unit"); @@ -317,6 +298,74 @@ fail: } +static nxt_int_t +nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value) +{ + int ret; + PyObject *path, *sys; + nxt_str_t str; + nxt_uint_t n; + nxt_conf_value_t *array; + + if (value == NULL) { + return NXT_OK; + } + + sys = PySys_GetObject((char *) "path"); + if (nxt_slow_path(sys == NULL)) { + nxt_alert(task, "Python failed to get \"sys.path\" list"); + return NXT_ERROR; + } + + /* sys is a Borrowed reference. */ + + if (nxt_conf_type(value) == NXT_CONF_STRING) { + n = 0; + goto value_is_string; + } + + /* NXT_CONF_ARRAY */ + array = value; + + n = nxt_conf_array_elements_count(array); + + while (n != 0) { + n--; + + /* + * Insertion in front of existing paths starting from the last element + * to preserve original order while giving priority to the values + * specified in the "path" option. + */ + + value = nxt_conf_get_array_element(array, n); + + value_is_string: + + nxt_conf_get_string(value, &str); + + path = PyString_FromStringAndSize((char *) str.start, str.length); + if (nxt_slow_path(path == NULL)) { + nxt_alert(task, "Python failed to create string object \"%V\"", + &str); + return NXT_ERROR; + } + + ret = PyList_Insert(sys, 0, path); + + Py_DECREF(path); + + if (nxt_slow_path(ret != 0)) { + nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"", + &str); + return NXT_ERROR; + } + } + + return NXT_OK; +} + + static int nxt_python_init_threads(nxt_python_app_conf_t *c) { diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c index 98aeedf4..a6f94507 100644 --- a/src/python/nxt_python_asgi.c +++ b/src/python/nxt_python_asgi.c @@ -1131,11 +1131,12 @@ nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) static PyObject * nxt_py_asgi_port_read(PyObject *self, PyObject *args) { - int rc; - PyObject *arg; - Py_ssize_t n; - nxt_unit_ctx_t *ctx; - nxt_unit_port_t *port; + int rc; + PyObject *arg0, *arg1, *res; + Py_ssize_t n; + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + nxt_py_asgi_ctx_data_t *ctx_data; n = PyTuple_GET_SIZE(args); @@ -1147,31 +1148,45 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args) return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); } - arg = PyTuple_GET_ITEM(args, 0); - if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + arg0 = PyTuple_GET_ITEM(args, 0); + if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) { return PyErr_Format(PyExc_TypeError, "the first argument is not a long"); } - ctx = PyLong_AsVoidPtr(arg); + ctx = PyLong_AsVoidPtr(arg0); - arg = PyTuple_GET_ITEM(args, 1); - if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + arg1 = PyTuple_GET_ITEM(args, 1); + if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) { return PyErr_Format(PyExc_TypeError, "the second argument is not a long"); } - port = PyLong_AsVoidPtr(arg); - - nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); + port = PyLong_AsVoidPtr(arg1); rc = nxt_unit_process_port_msg(ctx, port); + nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return PyErr_Format(PyExc_RuntimeError, "error processing port %d message", port->id.id); } + if (rc == NXT_UNIT_OK) { + ctx_data = ctx->data; + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, + nxt_py_port_read, + arg0, arg1, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + } + Py_RETURN_NONE; } diff --git a/src/python/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c index da7b183c..77c45af5 100644 --- a/src/python/nxt_python_wsgi.c +++ b/src/python/nxt_python_wsgi.c @@ -59,6 +59,7 @@ static void nxt_python_wsgi_done(void); static void nxt_python_request_handler(nxt_unit_request_info_t *req); static PyObject *nxt_python_create_environ(nxt_python_app_conf_t *c); +static PyObject *nxt_python_copy_environ(nxt_unit_request_info_t *req); static PyObject *nxt_python_get_environ(nxt_python_ctx_t *pctx); static int nxt_python_add_sptr(nxt_python_ctx_t *pctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size); @@ -221,6 +222,7 @@ nxt_python_wsgi_ctx_data_alloc(void **pdata) } pctx->write = NULL; + pctx->environ = NULL; pctx->start_resp = PyCFunction_New(nxt_py_start_resp_method, (PyObject *) pctx); @@ -237,6 +239,11 @@ nxt_python_wsgi_ctx_data_alloc(void **pdata) goto fail; } + pctx->environ = nxt_python_copy_environ(NULL); + if (nxt_slow_path(pctx->environ == NULL)) { + goto fail; + } + *pdata = pctx; return NXT_UNIT_OK; @@ -258,6 +265,7 @@ nxt_python_wsgi_ctx_data_free(void *data) Py_XDECREF(pctx->start_resp); Py_XDECREF(pctx->write); + Py_XDECREF(pctx->environ); Py_XDECREF(pctx); } @@ -295,6 +303,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) int rc; PyObject *environ, *args, *response, *iterator, *item; PyObject *close, *result; + nxt_bool_t prepare_environ; nxt_python_ctx_t *pctx; pctx = req->ctx->data; @@ -305,6 +314,19 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) PyEval_RestoreThread(pctx->thread_state); + if (nxt_slow_path(pctx->environ == NULL)) { + pctx->environ = nxt_python_copy_environ(req); + + if (pctx->environ == NULL) { + prepare_environ = 0; + + rc = NXT_UNIT_ERROR; + goto done; + } + } + + prepare_environ = 1; + environ = nxt_python_get_environ(pctx); if (nxt_slow_path(environ == NULL)) { rc = NXT_UNIT_ERROR; @@ -418,6 +440,14 @@ done: pctx->req = NULL; nxt_unit_request_done(req, rc); + + if (nxt_fast_path(prepare_environ)) { + PyEval_RestoreThread(pctx->thread_state); + + pctx->environ = nxt_python_copy_environ(NULL); + + pctx->thread_state = PyEval_SaveThread(); + } } @@ -532,23 +562,30 @@ fail: static PyObject * -nxt_python_get_environ(nxt_python_ctx_t *pctx) +nxt_python_copy_environ(nxt_unit_request_info_t *req) { - int rc; - uint32_t i, j, vl; - PyObject *environ; - nxt_unit_field_t *f, *f2; - nxt_unit_request_t *r; + PyObject *environ; environ = PyDict_Copy(nxt_py_environ_ptyp); + if (nxt_slow_path(environ == NULL)) { - nxt_unit_req_error(pctx->req, + nxt_unit_req_alert(req, "Python failed to copy the \"environ\" dictionary"); - - return NULL; + nxt_python_print_exception(); } - pctx->environ = environ; + return environ; +} + + +static PyObject * +nxt_python_get_environ(nxt_python_ctx_t *pctx) +{ + int rc; + uint32_t i, j, vl; + PyObject *environ; + nxt_unit_field_t *f, *f2; + nxt_unit_request_t *r; r = pctx->req->request; @@ -628,7 +665,7 @@ nxt_python_get_environ(nxt_python_ctx_t *pctx) #undef RC - if (nxt_slow_path(PyDict_SetItem(environ, nxt_py_wsgi_input_str, + if (nxt_slow_path(PyDict_SetItem(pctx->environ, nxt_py_wsgi_input_str, (PyObject *) pctx) != 0)) { nxt_unit_req_error(pctx->req, @@ -636,11 +673,15 @@ nxt_python_get_environ(nxt_python_ctx_t *pctx) goto fail; } + environ = pctx->environ; + pctx->environ = NULL; + return environ; fail: - Py_DECREF(environ); + Py_DECREF(pctx->environ); + pctx->environ = NULL; return NULL; } diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 698d4a43..0aad887d 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -38,6 +38,7 @@ static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx); static void nxt_ruby_request_handler(nxt_unit_request_info_t *req); static void *nxt_ruby_request_handler_gvl(void *req); static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx); +static void *nxt_ruby_thread_create_gvl(void *rctx); static VALUE nxt_ruby_thread_func(VALUE arg); static void *nxt_ruby_unit_run(void *ctx); static void nxt_ruby_ubf(void *ctx); @@ -1141,7 +1142,7 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx) rctx->ctx = ctx; - res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx); + res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx); if (nxt_fast_path(res != Qnil)) { nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); @@ -1159,6 +1160,17 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx) } +static void * +nxt_ruby_thread_create_gvl(void *rctx) +{ + VALUE res; + + res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx); + + return (void *) (uintptr_t) res; +} + + static VALUE nxt_ruby_thread_func(VALUE arg) { |