summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorAndrei Belov <defan@nginx.com>2021-02-04 18:40:45 +0300
committerAndrei Belov <defan@nginx.com>2021-02-04 18:40:45 +0300
commit0997fa324ca523ab282f595ac9f44b3e4daff86a (patch)
tree37424fff265780f34f9a9adb7ddd7501a67843f1 /src
parent2bc99c614d5547e773bda73364efada47f0a37bf (diff)
parent774a6034d9daf32ac6c98da7e4c0ca9e820536b4 (diff)
downloadunit-0997fa324ca523ab282f595ac9f44b3e4daff86a.tar.gz
unit-0997fa324ca523ab282f595ac9f44b3e4daff86a.tar.bz2
Merged with the default branch.
Diffstat (limited to '')
-rw-r--r--src/nodejs/unit-http/http_server.js50
-rw-r--r--src/nodejs/unit-http/unit.cpp271
-rw-r--r--src/nodejs/unit-http/unit.h6
-rw-r--r--src/nxt_app_queue.h18
-rw-r--r--src/nxt_application.h14
-rw-r--r--src/nxt_conf_validation.c35
-rw-r--r--src/nxt_controller.c37
-rw-r--r--src/nxt_h1proto.c39
-rw-r--r--src/nxt_http.h1
-rw-r--r--src/nxt_http_route.c6
-rw-r--r--src/nxt_http_static.c23
-rw-r--r--src/nxt_isolation.c6
-rw-r--r--src/nxt_main_process.c94
-rw-r--r--src/nxt_php_sapi.c11
-rw-r--r--src/nxt_port.c8
-rw-r--r--src/nxt_process.h2
-rw-r--r--src/nxt_router.c83
-rw-r--r--src/nxt_runtime.c2
-rw-r--r--src/nxt_unit.c68
-rw-r--r--src/nxt_unit_request.h1
-rw-r--r--src/python/nxt_python.c103
-rw-r--r--src/python/nxt_python_asgi.c41
-rw-r--r--src/python/nxt_python_wsgi.c65
-rw-r--r--src/ruby/nxt_ruby.c14
24 files changed, 713 insertions, 285 deletions
diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js
index d378e410..e59296ae 100644
--- a/src/nodejs/unit-http/http_server.js
+++ b/src/nodejs/unit-http/http_server.js
@@ -11,6 +11,7 @@ const util = require('util');
const unit_lib = require('./build/Release/unit-http');
const Socket = require('./socket');
const WebSocketFrame = require('./websocket_frame');
+const Readable = require('stream').Readable;
function ServerResponse(req) {
@@ -23,6 +24,7 @@ function ServerResponse(req) {
req._response = this;
this.socket = req.socket;
this.connection = req.connection;
+ this.writable = true;
}
util.inherits(ServerResponse, EventEmitter);
@@ -268,6 +270,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
res = this._write(chunk, 0, contentLength);
if (res < contentLength) {
this.socket.writable = false;
+ this.writable = false;
o = new BufferedOutput(this, res, chunk, encoding, callback);
this.server._output.push(o);
@@ -328,6 +331,8 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
if (typeof callback === 'function') {
callback();
}
+
+ this.emit("finish");
});
this.finished = true;
@@ -337,15 +342,14 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
};
function ServerRequest(server, socket) {
- EventEmitter.call(this);
+ Readable.call(this);
this.server = server;
this.socket = socket;
this.connection = socket;
+ this._pushed_eofchunk = false;
}
-util.inherits(ServerRequest, EventEmitter);
-
-ServerRequest.prototype.unpipe = undefined;
+util.inherits(ServerRequest, Readable);
ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) {
this.timeout = msecs;
@@ -377,35 +381,21 @@ ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() {
return http.STATUS_CODES;
};
-ServerRequest.prototype.listeners = function listeners() {
- return [];
-};
-
-ServerRequest.prototype.resume = function resume() {
- return [];
-};
+ServerRequest.prototype._request_read = unit_lib.request_read;
-/*
- * The "on" method is overridden to defer reading data until user code is
- * ready, that is (ev === "data"). This can occur after req.emit("end") is
- * executed, since the user code can be scheduled asynchronously by Promises
- * and so on. Passing the data is postponed by process.nextTick() until
- * the "on" method caller completes.
- */
-ServerRequest.prototype.on = function on(ev, fn) {
- Server.prototype.on.call(this, ev, fn);
+ServerRequest.prototype._read = function _read(n) {
+ const b = this._request_read(n);
- if (ev === "data") {
- process.nextTick(function () {
- if (this._data.length !== 0) {
- this.emit("data", this._data);
- }
+ if (b != null) {
+ this.push(b);
+ }
- }.bind(this));
+ if (!this._pushed_eofchunk && (b == null || b.length < n)) {
+ this._pushed_eofchunk = true;
+ this.push(null);
}
};
-ServerRequest.prototype.addListener = ServerRequest.prototype.on;
function Server(requestListener) {
EventEmitter.call(this);
@@ -472,11 +462,6 @@ Server.prototype.emit_request = function (req, res) {
} else {
this.emit("request", req, res);
}
-
- process.nextTick(() => {
- req.emit("finish");
- req.emit("end");
- });
};
Server.prototype.emit_close = function () {
@@ -523,6 +508,7 @@ Server.prototype.emit_drain = function () {
}
resp.socket.writable = true;
+ resp.writable = true;
process.nextTick(() => {
resp.emit("drain");
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index c5bca49a..589eca3f 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -13,25 +13,140 @@
#include <nxt_unit_websocket.h>
-static void delete_port_data(uv_handle_t* handle);
-
napi_ref Unit::constructor_;
struct port_data_t {
- nxt_unit_ctx_t *ctx;
- nxt_unit_port_t *port;
- uv_poll_t poll;
+ port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
+
+ void process_port_msg();
+ void stop();
+
+ template<typename T>
+ static port_data_t *get(T *handle);
+
+ static void read_callback(uv_poll_t *handle, int status, int events);
+ static void timer_callback(uv_timer_t *handle);
+ static void delete_data(uv_handle_t* handle);
+
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_port_t *port;
+ uv_poll_t poll;
+ uv_timer_t timer;
+ int ref_count;
+ bool scheduled;
+ bool stopped;
};
struct req_data_t {
napi_ref sock_ref;
+ napi_ref req_ref;
napi_ref resp_ref;
napi_ref conn_ref;
};
+port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
+ ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
+{
+ timer.type = UV_UNKNOWN_HANDLE;
+}
+
+
+void
+port_data_t::process_port_msg()
+{
+ int rc, err;
+
+ rc = nxt_unit_process_port_msg(ctx, port);
+
+ if (rc != NXT_UNIT_OK) {
+ return;
+ }
+
+ if (timer.type == UV_UNKNOWN_HANDLE) {
+ err = uv_timer_init(poll.loop, &timer);
+ if (err < 0) {
+ nxt_unit_warn(ctx, "Failed to init uv.poll");
+ return;
+ }
+
+ ref_count++;
+ timer.data = this;
+ }
+
+ if (!scheduled && !stopped) {
+ uv_timer_start(&timer, timer_callback, 0, 0);
+
+ scheduled = true;
+ }
+}
+
+
+void
+port_data_t::stop()
+{
+ stopped = true;
+
+ uv_poll_stop(&poll);
+
+ uv_close((uv_handle_t *) &poll, delete_data);
+
+ if (timer.type == UV_UNKNOWN_HANDLE) {
+ return;
+ }
+
+ uv_timer_stop(&timer);
+
+ uv_close((uv_handle_t *) &timer, delete_data);
+}
+
+
+template<typename T>
+port_data_t *
+port_data_t::get(T *handle)
+{
+ return (port_data_t *) handle->data;
+}
+
+
+void
+port_data_t::read_callback(uv_poll_t *handle, int status, int events)
+{
+ get(handle)->process_port_msg();
+}
+
+
+void
+port_data_t::timer_callback(uv_timer_t *handle)
+{
+ port_data_t *data;
+
+ data = get(handle);
+
+ data->scheduled = false;
+ if (data->stopped) {
+ return;
+ }
+
+ data->process_port_msg();
+}
+
+
+void
+port_data_t::delete_data(uv_handle_t* handle)
+{
+ port_data_t *data;
+
+ data = get(handle);
+
+ if (--data->ref_count <= 0) {
+ delete data;
+ }
+}
+
+
Unit::Unit(napi_env env, napi_value jsthis):
nxt_napi(env),
wrapper_(wrap(jsthis, this, destroy)),
@@ -65,6 +180,7 @@ Unit::init(napi_env env, napi_value exports)
constructor_ = napi.create_reference(ctor);
napi.set_named_property(exports, "Unit", ctor);
+ napi.set_named_property(exports, "request_read", request_read);
napi.set_named_property(exports, "response_send_headers",
response_send_headers);
napi.set_named_property(exports, "response_write", response_write);
@@ -206,7 +322,7 @@ Unit::request_handler(nxt_unit_request_info_t *req)
server_obj = get_server_object();
socket = create_socket(server_obj, req);
- request = create_request(server_obj, socket);
+ request = create_request(server_obj, socket, req);
response = create_response(server_obj, request, req);
create_headers(req, request);
@@ -301,6 +417,7 @@ Unit::close_handler(nxt_unit_request_info_t *req)
nxt_napi::create(0));
remove_wrap(req_data->sock_ref);
+ remove_wrap(req_data->req_ref);
remove_wrap(req_data->resp_ref);
remove_wrap(req_data->conn_ref);
@@ -350,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
}
-static void
-nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
-{
- port_data_t *data;
-
- data = (port_data_t *) handle->data;
-
- nxt_unit_process_port_msg(data->ctx, data->port);
-}
-
-
int
Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int err;
- Unit *obj;
- uv_loop_t *loop;
- port_data_t *data;
- napi_status status;
+ int err;
+ Unit *obj;
+ uv_loop_t *loop;
+ port_data_t *data;
+ napi_status status;
if (port->in_fd != -1) {
- obj = reinterpret_cast<Unit *>(ctx->unit->data);
-
if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
port->in_fd, strerror(errno), errno);
return -1;
}
+ obj = reinterpret_cast<Unit *>(ctx->unit->data);
+
status = napi_get_uv_event_loop(obj->env(), &loop);
if (status != napi_ok) {
nxt_unit_warn(ctx, "Failed to get uv.loop");
return NXT_UNIT_ERROR;
}
- data = new port_data_t;
+ data = new port_data_t(ctx, port);
err = uv_poll_init(loop, &data->poll, port->in_fd);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to init uv.poll");
+ delete data;
return NXT_UNIT_ERROR;
}
- err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
+ err = uv_poll_start(&data->poll, UV_READABLE,
+ port_data_t::read_callback);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to start uv.poll");
+ delete data;
return NXT_UNIT_ERROR;
}
port->data = data;
- data->ctx = ctx;
- data->port = port;
+ data->ref_count++;
data->poll.data = data;
}
@@ -418,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
if (port->data != NULL) {
data = (port_data_t *) port->data;
- if (data->port == port) {
- uv_poll_stop(&data->poll);
-
- uv_close((uv_handle_t *) &data->poll, delete_port_data);
- }
+ data->stop();
}
}
-static void
-delete_port_data(uv_handle_t* handle)
-{
- port_data_t *data;
-
- data = (port_data_t *) handle->data;
-
- delete data;
-}
-
-
void
Unit::quit_cb(nxt_unit_ctx_t *ctx)
{
@@ -488,9 +581,8 @@ Unit::get_server_object()
void
Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
{
- void *data;
uint32_t i;
- napi_value headers, raw_headers, buffer;
+ napi_value headers, raw_headers;
napi_status status;
nxt_unit_request_t *r;
@@ -515,11 +607,6 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
set_named_property(request, "url", r->target, r->target_length);
set_named_property(request, "_websocket_handshake", r->websocket_handshake);
-
- buffer = create_buffer((size_t) req->content_length, &data);
- nxt_unit_request_read(req, data, req->content_length);
-
- set_named_property(request, "_data", buffer);
}
@@ -577,13 +664,20 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
napi_value
-Unit::create_request(napi_value server_obj, napi_value socket)
+Unit::create_request(napi_value server_obj, napi_value socket,
+ nxt_unit_request_info_t *req)
{
- napi_value constructor;
+ napi_value constructor, res;
+ req_data_t *req_data;
constructor = get_named_property(server_obj, "ServerRequest");
- return new_instance(constructor, server_obj, socket);
+ res = new_instance(constructor, server_obj, socket);
+
+ req_data = (req_data_t *) req->data;
+ req_data->req_ref = wrap(res, req, req_destroy);
+
+ return res;
}
@@ -643,6 +737,47 @@ Unit::create_websocket_frame(napi_value server_obj,
napi_value
+Unit::request_read(napi_env env, napi_callback_info info)
+{
+ void *data;
+ uint32_t wm;
+ nxt_napi napi(env);
+ napi_value this_arg, argv, buffer;
+ nxt_unit_request_info_t *req;
+
+ try {
+ this_arg = napi.get_cb_info(info, argv);
+
+ try {
+ req = napi.get_request_info(this_arg);
+
+ } catch (exception &e) {
+ return nullptr;
+ }
+
+ if (req->content_length == 0) {
+ return nullptr;
+ }
+
+ wm = napi.get_value_uint32(argv);
+
+ if (wm > req->content_length) {
+ wm = req->content_length;
+ }
+
+ buffer = napi.create_buffer((size_t) wm, &data);
+ nxt_unit_request_read(req, data, wm);
+
+ } catch (exception &e) {
+ napi.throw_error(e);
+ return nullptr;
+ }
+
+ return buffer;
+}
+
+
+napi_value
Unit::response_send_headers(napi_env env, napi_callback_info info)
{
int ret;
@@ -884,6 +1019,7 @@ Unit::response_end(napi_env env, napi_callback_info info)
req_data = (req_data_t *) req->data;
napi.remove_wrap(req_data->sock_ref);
+ napi.remove_wrap(req_data->req_ref);
napi.remove_wrap(req_data->resp_ref);
napi.remove_wrap(req_data->conn_ref);
@@ -998,33 +1134,28 @@ Unit::websocket_set_sock(napi_env env, napi_callback_info info)
void
-Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
{
- nxt_unit_request_info_t *req;
-
- req = (nxt_unit_request_info_t *) nativeObject;
-
- nxt_unit_warn(NULL, "conn_destroy: %p", req);
+ nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
}
void
-Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
{
- nxt_unit_request_info_t *req;
-
- req = (nxt_unit_request_info_t *) nativeObject;
-
- nxt_unit_warn(NULL, "sock_destroy: %p", req);
+ nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
}
void
-Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
+Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
{
- nxt_unit_request_info_t *req;
+ nxt_unit_req_debug(NULL, "req_destroy: %p", r);
+}
- req = (nxt_unit_request_info_t *) nativeObject;
- nxt_unit_warn(NULL, "resp_destroy: %p", req);
+void
+Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
+{
+ nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
}
diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h
index 07823c26..4ef40d45 100644
--- a/src/nodejs/unit-http/unit.h
+++ b/src/nodejs/unit-http/unit.h
@@ -21,6 +21,7 @@ private:
static void destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint);
+ static void req_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static napi_value create_server(napi_env env, napi_callback_info info);
@@ -50,7 +51,8 @@ private:
napi_value create_socket(napi_value server_obj,
nxt_unit_request_info_t *req);
- napi_value create_request(napi_value server_obj, napi_value socket);
+ napi_value create_request(napi_value server_obj, napi_value socket,
+ nxt_unit_request_info_t *req);
napi_value create_response(napi_value server_obj, napi_value request,
nxt_unit_request_info_t *req);
@@ -58,6 +60,8 @@ private:
napi_value create_websocket_frame(napi_value server_obj,
nxt_unit_websocket_frame_t *ws);
+ static napi_value request_read(napi_env env, napi_callback_info info);
+
static napi_value response_send_headers(napi_env env,
napi_callback_info info);
diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h
index 127cb8f3..a1cc2f11 100644
--- a/src/nxt_app_queue.h
+++ b/src/nxt_app_queue.h
@@ -23,7 +23,7 @@ typedef struct {
typedef struct {
- nxt_app_nncq_atomic_t nitems;
+ nxt_app_nncq_atomic_t notified;
nxt_app_nncq_t free_items;
nxt_app_nncq_t queue;
nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE];
@@ -42,7 +42,7 @@ nxt_app_queue_init(nxt_app_queue_t volatile *q)
nxt_app_nncq_enqueue(&q->free_items, i);
}
- q->nitems = 0;
+ q->notified = 0;
}
@@ -50,6 +50,7 @@ nxt_inline nxt_int_t
nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie)
{
+ int n;
nxt_app_queue_item_t *qi;
nxt_app_nncq_atomic_t i;
@@ -67,16 +68,23 @@ nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
nxt_app_nncq_enqueue(&q->queue, i);
- i = nxt_atomic_fetch_add(&q->nitems, 1);
+ n = nxt_atomic_cmp_set(&q->notified, 0, 1);
if (notify != NULL) {
- *notify = (i == 0);
+ *notify = n;
}
return NXT_OK;
}
+nxt_inline void
+nxt_app_queue_notification_received(nxt_app_queue_t volatile *q)
+{
+ q->notified = 0;
+}
+
+
nxt_inline nxt_bool_t
nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie,
uint32_t tracking)
@@ -110,8 +118,6 @@ nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie)
nxt_app_nncq_enqueue(&q->free_items, i);
- nxt_atomic_fetch_add(&q->nitems, -1);
-
return res;
}
diff --git a/src/nxt_application.h b/src/nxt_application.h
index 5632f56f..632c5632 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -47,13 +47,13 @@ typedef struct {
typedef struct {
- char *home;
- nxt_str_t path;
- nxt_str_t module;
- char *callable;
- nxt_str_t protocol;
- uint32_t threads;
- uint32_t thread_stack_size;
+ char *home;
+ nxt_conf_value_t *path;
+ nxt_str_t module;
+ char *callable;
+ nxt_str_t protocol;
+ uint32_t threads;
+ uint32_t thread_stack_size;
} nxt_python_app_conf_t;
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index acb2e3de..67fa3095 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -96,6 +96,10 @@ static nxt_int_t nxt_conf_vldt_return(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
+static nxt_int_t nxt_conf_vldt_python_path(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
+static nxt_int_t nxt_conf_vldt_python_path_element(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_threads(nxt_conf_validation_t *vldt,
@@ -491,7 +495,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = {
.type = NXT_CONF_VLDT_STRING,
}, {
.name = nxt_string("path"),
- .type = NXT_CONF_VLDT_STRING,
+ .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY,
+ .validator = nxt_conf_vldt_python_path,
}, {
.name = nxt_string("module"),
.type = NXT_CONF_VLDT_STRING,
@@ -1377,6 +1382,34 @@ nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
static nxt_int_t
+nxt_conf_vldt_python_path(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data)
+{
+ if (nxt_conf_type(value) == NXT_CONF_ARRAY) {
+ return nxt_conf_vldt_array_iterator(vldt, value,
+ &nxt_conf_vldt_python_path_element);
+ }
+
+ /* NXT_CONF_STRING */
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_conf_vldt_python_path_element(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value)
+{
+ if (nxt_conf_type(value) != NXT_CONF_STRING) {
+ return nxt_conf_vldt_error(vldt, "The \"path\" array must contain "
+ "only string values.");
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data)
{
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 9a34a877..772d10c8 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -1686,7 +1686,10 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
static void
nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
{
+ void *mem;
+ u_char *end;
size_t size;
+ nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_t *main_port;
nxt_runtime_t *rt;
@@ -1697,14 +1700,38 @@ nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
size = nxt_conf_json_length(conf, NULL);
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
+ fd = nxt_shm_open(task, size);
+ if (nxt_slow_path(fd == -1)) {
+ return;
+ }
+
+ mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ goto fail;
+ }
+
+ end = nxt_conf_json_print(mem, conf, NULL);
+
+ nxt_mem_munmap(mem, size);
- if (nxt_fast_path(b != NULL)) {
- b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
+ size = end - (u_char *) mem;
- (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE,
- -1, 0, -1, b);
+ b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, sizeof(size_t), 0);
+ if (nxt_slow_path(b == NULL)) {
+ goto fail;
}
+
+ b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
+
+ (void) nxt_port_socket_write(task, main_port,
+ NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_CLOSE_FD,
+ fd, 0, -1, b);
+
+ return;
+
+fail:
+
+ nxt_fd_close(fd);
}
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index dccbe56c..d3da6942 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -174,6 +174,8 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = {
{ nxt_string("Content-Type"), &nxt_http_request_field,
offsetof(nxt_http_request_t, content_type) },
{ nxt_string("Content-Length"), &nxt_http_request_content_length, 0 },
+ { nxt_string("Authorization"), &nxt_http_request_field,
+ offsetof(nxt_http_request_t, authorization) },
};
@@ -1151,19 +1153,19 @@ static const nxt_str_t nxt_http_client_error[] = {
nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"),
nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"),
nxt_string("HTTP/1.1 417 Expectation Failed\r\n"),
- nxt_string("HTTP/1.1 418\r\n"),
- nxt_string("HTTP/1.1 419\r\n"),
- nxt_string("HTTP/1.1 420\r\n"),
- nxt_string("HTTP/1.1 421\r\n"),
- nxt_string("HTTP/1.1 422\r\n"),
- nxt_string("HTTP/1.1 423\r\n"),
- nxt_string("HTTP/1.1 424\r\n"),
- nxt_string("HTTP/1.1 425\r\n"),
+ nxt_string("HTTP/1.1 418 I'm a teapot\r\n"),
+ nxt_string("HTTP/1.1 419 \r\n"),
+ nxt_string("HTTP/1.1 420 \r\n"),
+ nxt_string("HTTP/1.1 421 Misdirected Request\r\n"),
+ nxt_string("HTTP/1.1 422 Unprocessable Entity\r\n"),
+ nxt_string("HTTP/1.1 423 Locked\r\n"),
+ nxt_string("HTTP/1.1 424 Failed Dependency\r\n"),
+ nxt_string("HTTP/1.1 425 \r\n"),
nxt_string("HTTP/1.1 426 Upgrade Required\r\n"),
- nxt_string("HTTP/1.1 427\r\n"),
- nxt_string("HTTP/1.1 428\r\n"),
- nxt_string("HTTP/1.1 429\r\n"),
- nxt_string("HTTP/1.1 430\r\n"),
+ nxt_string("HTTP/1.1 427 \r\n"),
+ nxt_string("HTTP/1.1 428 \r\n"),
+ nxt_string("HTTP/1.1 429 \r\n"),
+ nxt_string("HTTP/1.1 430 \r\n"),
nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"),
};
@@ -1190,7 +1192,7 @@ static const nxt_str_t nxt_http_server_error[] = {
};
-#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 65536\r\n")
+#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 999 \r\n")
static void
nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
@@ -1248,13 +1250,16 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
{
status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR];
- } else {
- p = nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH,
- "HTTP/1.1 %03d\r\n", n);
+ } else if (n <= NXT_HTTP_STATUS_MAX) {
+ (void) nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH,
+ "HTTP/1.1 %03d \r\n", n);
- unknown_status.length = p - buf;
+ unknown_status.length = UNKNOWN_STATUS_LENGTH;
unknown_status.start = buf;
status = &unknown_status;
+
+ } else {
+ status = &nxt_http_server_error[0];
}
size = status->length;
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 1418be95..e30bfeb4 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -156,6 +156,7 @@ struct nxt_http_request_s {
nxt_http_field_t *cookie;
nxt_http_field_t *referer;
nxt_http_field_t *user_agent;
+ nxt_http_field_t *authorization;
nxt_off_t content_length_n;
nxt_sockaddr_t *remote;
diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c
index 9aaa708e..28545fc9 100644
--- a/src/nxt_http_route.c
+++ b/src/nxt_http_route.c
@@ -813,6 +813,12 @@ nxt_http_route_ruleset_create(nxt_task_t *task, nxt_mp_t *mp,
next = 0;
+ /*
+ * A workaround for GCC 10 with -flto -O2 flags that warns about "name"
+ * may be uninitialized in nxt_http_route_rule_name_create().
+ */
+ nxt_str_null(&name);
+
for (i = 0; i < n; i++) {
rule_cv = nxt_conf_next_object_member(ruleset_cv, &name, &next);
diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c
index 5687ef2c..df2655fc 100644
--- a/src/nxt_http_static.c
+++ b/src/nxt_http_static.c
@@ -395,12 +395,15 @@ static void
nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data)
{
ssize_t n, size;
- nxt_buf_t *b, *fb;
+ nxt_buf_t *b, *fb, *next;
nxt_off_t rest;
nxt_http_request_t *r;
b = obj;
r = data;
+
+complete_buf:
+
fb = r->out;
if (nxt_slow_path(fb == NULL || r->error)) {
@@ -424,6 +427,8 @@ nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data)
goto clean;
}
+ next = b->next;
+
if (n == rest) {
nxt_file_close(task, fb->file);
r->out = NULL;
@@ -439,12 +444,24 @@ nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data)
b->mem.free = b->mem.pos + n;
nxt_http_request_send(task, r, b);
+
+ if (next != NULL) {
+ b = next;
+ goto complete_buf;
+ }
+
return;
clean:
- nxt_mp_free(r->mem_pool, b);
- nxt_mp_release(r->mem_pool);
+ do {
+ next = b->next;
+
+ nxt_mp_free(r->mem_pool, b);
+ nxt_mp_release(r->mem_pool);
+
+ b = next;
+ } while (b != NULL);
if (fb != NULL) {
nxt_file_close(task, fb->file);
diff --git a/src/nxt_isolation.c b/src/nxt_isolation.c
index 1e6323bc..cab0074b 100644
--- a/src/nxt_isolation.c
+++ b/src/nxt_isolation.c
@@ -676,12 +676,6 @@ nxt_isolation_unmount_all(nxt_task_t *task, nxt_process_t *process)
return;
}
-#if (NXT_HAVE_CLONE_NEWNS)
- if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS)) {
- return;
- }
-#endif
-
nxt_debug(task, "unmount all (%s)", process->name);
automount = &process->isolation.automount;
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 0cde435b..f20f2c2c 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -182,7 +182,7 @@ static nxt_conf_map_t nxt_python_app_conf[] = {
{
nxt_string("path"),
- NXT_CONF_MAP_STR,
+ NXT_CONF_MAP_PTR,
offsetof(nxt_common_app_conf_t, u.python.path),
},
@@ -1001,21 +1001,22 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_listening_socket_t ls;
u_char message[2048];
+ port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(port == NULL)) {
+ return;
+ }
+
b = msg->buf;
sa = (nxt_sockaddr_t *) b->mem.pos;
/* TODO check b size and make plain */
- out = NULL;
-
ls.socket = -1;
ls.error = NXT_SOCKET_ERROR_SYSTEM;
ls.start = message;
ls.end = message + sizeof(message);
- port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
- msg->port_msg.reply_port);
-
nxt_debug(task, "listening socket \"%*s\"",
(size_t) sa->length, nxt_sockaddr_start(sa));
@@ -1025,6 +1026,8 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_debug(task, "socket(\"%*s\"): %d",
(size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
+ out = NULL;
+
type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
} else {
@@ -1034,13 +1037,11 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
size + 1);
- if (nxt_slow_path(out == NULL)) {
- return;
- }
+ if (nxt_fast_path(out != NULL)) {
+ *out->mem.free++ = (uint8_t) ls.error;
- *out->mem.free++ = (uint8_t) ls.error;
-
- out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
+ out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
+ }
type = NXT_PORT_MSG_RPC_ERROR;
}
@@ -1408,12 +1409,45 @@ nxt_app_lang_compare(const void *v1, const void *v2)
static void
nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- ssize_t n, size, offset;
- nxt_buf_t *b;
+ void *p;
+ size_t size;
+ ssize_t n;
nxt_int_t ret;
nxt_file_t file;
nxt_runtime_t *rt;
+ p = MAP_FAILED;
+
+ /*
+ * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
+ * initialized in 'cleanup' section.
+ */
+ size = 0;
+
+ if (nxt_slow_path(msg->fd[0] == -1)) {
+ nxt_alert(task, "conf_store_handler: invalid shm fd");
+ goto error;
+ }
+
+ if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
+ nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
+ (int) nxt_buf_mem_used_size(&msg->buf->mem));
+ goto error;
+ }
+
+ nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
+
+ p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
+
+ nxt_fd_close(msg->fd[0]);
+ msg->fd[0] = -1;
+
+ if (nxt_slow_path(p == MAP_FAILED)) {
+ goto error;
+ }
+
+ nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
+
nxt_memzero(&file, sizeof(nxt_file_t));
rt = task->thread->runtime;
@@ -1427,33 +1461,35 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
goto error;
}
- offset = 0;
-
- for (b = msg->buf; b != NULL; b = b->next) {
- size = nxt_buf_mem_used_size(&b->mem);
-
- n = nxt_file_write(&file, b->mem.pos, size, offset);
+ n = nxt_file_write(&file, p, size, 0);
- if (nxt_slow_path(n != size)) {
- nxt_file_close(task, &file);
- (void) nxt_file_delete(file.name);
- goto error;
- }
+ nxt_file_close(task, &file);
- offset += n;
+ if (nxt_slow_path(n != (ssize_t) size)) {
+ (void) nxt_file_delete(file.name);
+ goto error;
}
- nxt_file_close(task, &file);
-
ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
if (nxt_fast_path(ret == NXT_OK)) {
- return;
+ goto cleanup;
}
error:
nxt_alert(task, "failed to store current configuration");
+
+cleanup:
+
+ if (p != MAP_FAILED) {
+ nxt_mem_munmap(p, size);
+ }
+
+ if (msg->fd[0] != -1) {
+ nxt_fd_close(msg->fd[0]);
+ msg->fd[0] = -1;
+ }
}
diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c
index d2fbdd27..369e7f32 100644
--- a/src/nxt_php_sapi.c
+++ b/src/nxt_php_sapi.c
@@ -1038,6 +1038,17 @@ nxt_php_execute(nxt_php_run_ctx_t *ctx, nxt_unit_request_t *r)
ctx->cookie = nxt_unit_sptr_get(&f->value);
}
+ if (r->authorization_field != NXT_UNIT_NONE_FIELD) {
+ f = r->fields + r->authorization_field;
+
+ php_handle_auth_data(nxt_unit_sptr_get(&f->value));
+
+ } else {
+ SG(request_info).auth_digest = NULL;
+ SG(request_info).auth_user = NULL;
+ SG(request_info).auth_password = NULL;
+ }
+
SG(sapi_headers).http_response_code = 200;
SG(request_info).path_translated = NULL;
diff --git a/src/nxt_port.c b/src/nxt_port.c
index dbcdec11..d4e46564 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -8,6 +8,7 @@
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_router.h>
+#include <nxt_app_queue.h>
#include <nxt_port_queue.h>
@@ -84,6 +85,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
void
nxt_port_close(nxt_task_t *task, nxt_port_t *port)
{
+ size_t size;
+
nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
port->id, port->type);
@@ -109,7 +112,10 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port)
}
if (port->queue != NULL) {
- nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t));
+ size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t)
+ : sizeof(nxt_port_queue_t);
+ nxt_mem_munmap(port->queue, size);
+
port->queue = NULL;
}
}
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 7afb8803..4f24b179 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -107,8 +107,6 @@ struct nxt_process_s {
nxt_port_mmaps_t incoming;
- nxt_thread_mutex_t cp_mutex;
-
uint32_t stream;
nxt_mp_t *mem_pool;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 9dd5c30e..03fe2a6c 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -699,26 +699,39 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void *p;
size_t size;
nxt_int_t ret;
+ nxt_port_t *port;
nxt_router_temp_conf_t *tmcf;
+ port = nxt_runtime_port_find(task->thread->runtime,
+ msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_alert(task, "conf_data_handler: reply port not found");
+ return;
+ }
+
+ p = MAP_FAILED;
+
+ /*
+ * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
+ * initialized in 'cleanup' section.
+ */
+ size = 0;
+
tmcf = nxt_router_temp_conf(task);
if (nxt_slow_path(tmcf == NULL)) {
- return;
+ goto fail;
}
if (nxt_slow_path(msg->fd[0] == -1)) {
- nxt_alert(task, "conf_data_handler: invalid file shm fd");
- return;
+ nxt_alert(task, "conf_data_handler: invalid shm fd");
+ goto fail;
}
if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
(int) nxt_buf_mem_used_size(&msg->buf->mem));
-
- nxt_fd_close(msg->fd[0]);
- msg->fd[0] = -1;
-
- return;
+ goto fail;
}
nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
@@ -729,22 +742,14 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
msg->fd[0] = -1;
if (nxt_slow_path(p == MAP_FAILED)) {
- return;
+ goto fail;
}
nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
tmcf->router_conf->router = nxt_router;
tmcf->stream = msg->port_msg.stream;
- tmcf->port = nxt_runtime_port_find(task->thread->runtime,
- msg->port_msg.pid,
- msg->port_msg.reply_port);
-
- if (nxt_slow_path(tmcf->port == NULL)) {
- nxt_alert(task, "reply port not found");
-
- goto fail;
- }
+ tmcf->port = port;
nxt_port_use(task, tmcf->port, 1);
@@ -757,9 +762,27 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_router_conf_error(task, tmcf);
}
+ goto cleanup;
+
fail:
- nxt_mem_munmap(p, size);
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
+ msg->port_msg.stream, 0, NULL);
+
+ if (tmcf != NULL) {
+ nxt_mp_destroy(tmcf->mem_pool);
+ }
+
+cleanup:
+
+ if (p != MAP_FAILED) {
+ nxt_mem_munmap(p, size);
+ }
+
+ if (msg->fd[0] != -1) {
+ nxt_fd_close(msg->fd[0]);
+ msg->fd[0] = -1;
+ }
}
@@ -1586,6 +1609,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
ret = nxt_router_app_queue_init(task, port);
if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_write_close(port);
+ nxt_port_read_close(port);
nxt_port_use(task, port, -1);
return NXT_ERROR;
}
@@ -3771,7 +3796,10 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, nxt_http_buf_last(r));
req_rpc_data->rpc_cancel = 0;
- req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
+
+ if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
+ req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
+ }
nxt_request_rpc_data_unlink(task, req_rpc_data);
@@ -5169,6 +5197,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
req->content_length_field = NXT_UNIT_NONE_FIELD;
req->content_type_field = NXT_UNIT_NONE_FIELD;
req->cookie_field = NXT_UNIT_NONE_FIELD;
+ req->authorization_field = NXT_UNIT_NONE_FIELD;
dst_field = req->fields;
@@ -5193,6 +5222,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
} else if (field == r->cookie) {
req->cookie_field = dst_field - req->fields;
+
+ } else if (field == r->authorization) {
+ req->authorization_field = dst_field - req->fields;
}
nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
@@ -5369,7 +5401,7 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_bool_t ack;
nxt_process_t *process;
nxt_free_map_t *m;
- nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
@@ -5390,8 +5422,13 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_thread_mutex_lock(&process->incoming.mutex);
for (i = 0; i < process->incoming.size; i++) {
- hdr = process->incoming.elts[i].mmap_handler->hdr;
- m = hdr->free_map;
+ mmap_handler = process->incoming.elts[i].mmap_handler;
+
+ if (nxt_slow_path(mmap_handler == NULL)) {
+ continue;
+ }
+
+ m = mmap_handler->hdr->free_map;
for (mi = 0; mi < MAX_FREE_IDX; mi++) {
if (m[mi] != 0) {
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index d9d986da..8a86d38a 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -1387,7 +1387,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
nxt_queue_init(&process->ports);
nxt_thread_mutex_create(&process->incoming.mutex);
- nxt_thread_mutex_create(&process->cp_mutex);
process->use_count = 1;
@@ -1408,7 +1407,6 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
nxt_port_mmaps_destroy(&process->incoming, 1);
nxt_thread_mutex_destroy(&process->incoming.mutex);
- nxt_thread_mutex_destroy(&process->cp_mutex);
/* processes from nxt_runtime_process_get() have no memory pool */
if (process->mem_pool != NULL) {
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 097f50d6..2fef17c5 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -784,8 +784,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
{
int rc;
int ready_fd, router_fd, read_in_fd, read_out_fd;
- char *unit_init, *version_end;
- long version_length;
+ char *unit_init, *version_end, *vars;
+ size_t version_length;
int64_t ready_pid, router_pid, read_pid;
uint32_t ready_stream, router_id, ready_id, read_id;
@@ -797,21 +797,30 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
return NXT_UNIT_ERROR;
}
- nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
+ version_end = strchr(unit_init, ';');
+ if (nxt_slow_path(version_end == NULL)) {
+ nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
+ NXT_UNIT_INIT_ENV, unit_init);
- version_length = nxt_length(NXT_VERSION);
+ return NXT_UNIT_ERROR;
+ }
- version_end = strchr(unit_init, ';');
- if (version_end == NULL
- || version_end - unit_init != version_length
- || memcmp(unit_init, NXT_VERSION, version_length) != 0)
- {
- nxt_unit_alert(NULL, "version check error");
+ version_length = version_end - unit_init;
+
+ rc = version_length != nxt_length(NXT_VERSION)
+ || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
+
+ if (nxt_slow_path(rc != 0)) {
+ nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
+ "%.*s, while the app was compiled with libunit %s",
+ (int) version_length, unit_init, NXT_VERSION);
return NXT_UNIT_ERROR;
}
- rc = sscanf(version_end + 1,
+ vars = version_end + 1;
+
+ rc = sscanf(vars,
"%"PRIu32";"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
@@ -823,12 +832,22 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
&read_pid, &read_id, &read_in_fd, &read_out_fd,
log_fd, shm_limit);
+ if (nxt_slow_path(rc == EOF)) {
+ nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
+ vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
+
+ return NXT_UNIT_ERROR;
+ }
+
if (nxt_slow_path(rc != 13)) {
- nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
+ nxt_unit_alert(NULL, "invalid number of variables in %s env: "
+ "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars);
return NXT_UNIT_ERROR;
}
+ nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
+
nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
ready_port->in_fd = -1;
@@ -3587,7 +3606,10 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ do {
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ } while (res == NXT_UNIT_AGAIN);
+
if (res == NXT_UNIT_ERROR) {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4994,7 +5016,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
- nxt_unit_ctx_impl_t *ctx_impl;
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
@@ -5002,9 +5023,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
-
-retry:
if (port == lib->shared_port) {
rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
@@ -5030,15 +5048,6 @@ retry:
nxt_unit_process_ready_req(ctx);
- if (ctx_impl->online) {
- rbuf = nxt_unit_read_buf_get(ctx);
- if (nxt_slow_path(rbuf == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
- goto retry;
- }
-
return rc;
}
@@ -6073,7 +6082,10 @@ static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf)
{
- int res;
+ int res;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
retry:
@@ -6086,6 +6098,8 @@ retry:
}
if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_app_queue_notification_received(port_impl->queue);
+
nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);
diff --git a/src/nxt_unit_request.h b/src/nxt_unit_request.h
index fede00d2..5dbf648d 100644
--- a/src/nxt_unit_request.h
+++ b/src/nxt_unit_request.h
@@ -31,6 +31,7 @@ struct nxt_unit_request_s {
uint32_t content_length_field;
uint32_t content_type_field;
uint32_t cookie_field;
+ uint32_t authorization_field;
uint64_t content_length;
diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c
index faf0c0e1..d8204937 100644
--- a/src/python/nxt_python.c
+++ b/src/python/nxt_python.c
@@ -24,6 +24,7 @@ typedef struct {
static nxt_int_t nxt_python_start(nxt_task_t *task,
nxt_process_data_t *data);
+static nxt_int_t nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value);
static int nxt_python_init_threads(nxt_python_app_conf_t *c);
static int nxt_python_ready_handler(nxt_unit_ctx_t *ctx);
static void *nxt_python_thread_func(void *main_ctx);
@@ -67,7 +68,7 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
int rc;
char *nxt_py_module;
size_t len;
- PyObject *obj, *pypath, *module;
+ PyObject *obj, *module;
nxt_str_t proto;
const char *callable;
nxt_unit_ctx_t *unit_ctx;
@@ -162,38 +163,18 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
}
nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush");
+
+ /* obj is a Borrowed reference. */
+ obj = NULL;
+
if (nxt_slow_path(nxt_py_stderr_flush == NULL)) {
nxt_alert(task, "Python failed to get \"flush\" attribute of "
"\"sys.stderr\" object");
goto fail;
}
- /* obj is a Borrowed reference. */
-
- if (c->path.length > 0) {
- obj = PyString_FromStringAndSize((char *) c->path.start,
- c->path.length);
-
- if (nxt_slow_path(obj == NULL)) {
- nxt_alert(task, "Python failed to create string object \"%V\"",
- &c->path);
- goto fail;
- }
-
- pypath = PySys_GetObject((char *) "path");
-
- if (nxt_slow_path(pypath == NULL)) {
- nxt_alert(task, "Python failed to get \"sys.path\" list");
- goto fail;
- }
-
- if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) {
- nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"",
- &c->path);
- goto fail;
- }
-
- Py_DECREF(obj);
+ if (nxt_slow_path(nxt_python_set_path(task, c->path) != NXT_OK)) {
+ goto fail;
}
obj = Py_BuildValue("[s]", "unit");
@@ -317,6 +298,74 @@ fail:
}
+static nxt_int_t
+nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value)
+{
+ int ret;
+ PyObject *path, *sys;
+ nxt_str_t str;
+ nxt_uint_t n;
+ nxt_conf_value_t *array;
+
+ if (value == NULL) {
+ return NXT_OK;
+ }
+
+ sys = PySys_GetObject((char *) "path");
+ if (nxt_slow_path(sys == NULL)) {
+ nxt_alert(task, "Python failed to get \"sys.path\" list");
+ return NXT_ERROR;
+ }
+
+ /* sys is a Borrowed reference. */
+
+ if (nxt_conf_type(value) == NXT_CONF_STRING) {
+ n = 0;
+ goto value_is_string;
+ }
+
+ /* NXT_CONF_ARRAY */
+ array = value;
+
+ n = nxt_conf_array_elements_count(array);
+
+ while (n != 0) {
+ n--;
+
+ /*
+ * Insertion in front of existing paths starting from the last element
+ * to preserve original order while giving priority to the values
+ * specified in the "path" option.
+ */
+
+ value = nxt_conf_get_array_element(array, n);
+
+ value_is_string:
+
+ nxt_conf_get_string(value, &str);
+
+ path = PyString_FromStringAndSize((char *) str.start, str.length);
+ if (nxt_slow_path(path == NULL)) {
+ nxt_alert(task, "Python failed to create string object \"%V\"",
+ &str);
+ return NXT_ERROR;
+ }
+
+ ret = PyList_Insert(sys, 0, path);
+
+ Py_DECREF(path);
+
+ if (nxt_slow_path(ret != 0)) {
+ nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"",
+ &str);
+ return NXT_ERROR;
+ }
+ }
+
+ return NXT_OK;
+}
+
+
static int
nxt_python_init_threads(nxt_python_app_conf_t *c)
{
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c
index 98aeedf4..a6f94507 100644
--- a/src/python/nxt_python_asgi.c
+++ b/src/python/nxt_python_asgi.c
@@ -1131,11 +1131,12 @@ nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
static PyObject *
nxt_py_asgi_port_read(PyObject *self, PyObject *args)
{
- int rc;
- PyObject *arg;
- Py_ssize_t n;
- nxt_unit_ctx_t *ctx;
- nxt_unit_port_t *port;
+ int rc;
+ PyObject *arg0, *arg1, *res;
+ Py_ssize_t n;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_port_t *port;
+ nxt_py_asgi_ctx_data_t *ctx_data;
n = PyTuple_GET_SIZE(args);
@@ -1147,31 +1148,45 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args)
return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
}
- arg = PyTuple_GET_ITEM(args, 0);
- if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
+ arg0 = PyTuple_GET_ITEM(args, 0);
+ if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) {
return PyErr_Format(PyExc_TypeError,
"the first argument is not a long");
}
- ctx = PyLong_AsVoidPtr(arg);
+ ctx = PyLong_AsVoidPtr(arg0);
- arg = PyTuple_GET_ITEM(args, 1);
- if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
+ arg1 = PyTuple_GET_ITEM(args, 1);
+ if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) {
return PyErr_Format(PyExc_TypeError,
"the second argument is not a long");
}
- port = PyLong_AsVoidPtr(arg);
-
- nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
+ port = PyLong_AsVoidPtr(arg1);
rc = nxt_unit_process_port_msg(ctx, port);
+ nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc);
+
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return PyErr_Format(PyExc_RuntimeError,
"error processing port %d message", port->id.id);
}
+ if (rc == NXT_UNIT_OK) {
+ ctx_data = ctx->data;
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon,
+ nxt_py_port_read,
+ arg0, arg1, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ }
+
Py_RETURN_NONE;
}
diff --git a/src/python/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c
index da7b183c..77c45af5 100644
--- a/src/python/nxt_python_wsgi.c
+++ b/src/python/nxt_python_wsgi.c
@@ -59,6 +59,7 @@ static void nxt_python_wsgi_done(void);
static void nxt_python_request_handler(nxt_unit_request_info_t *req);
static PyObject *nxt_python_create_environ(nxt_python_app_conf_t *c);
+static PyObject *nxt_python_copy_environ(nxt_unit_request_info_t *req);
static PyObject *nxt_python_get_environ(nxt_python_ctx_t *pctx);
static int nxt_python_add_sptr(nxt_python_ctx_t *pctx, PyObject *name,
nxt_unit_sptr_t *sptr, uint32_t size);
@@ -221,6 +222,7 @@ nxt_python_wsgi_ctx_data_alloc(void **pdata)
}
pctx->write = NULL;
+ pctx->environ = NULL;
pctx->start_resp = PyCFunction_New(nxt_py_start_resp_method,
(PyObject *) pctx);
@@ -237,6 +239,11 @@ nxt_python_wsgi_ctx_data_alloc(void **pdata)
goto fail;
}
+ pctx->environ = nxt_python_copy_environ(NULL);
+ if (nxt_slow_path(pctx->environ == NULL)) {
+ goto fail;
+ }
+
*pdata = pctx;
return NXT_UNIT_OK;
@@ -258,6 +265,7 @@ nxt_python_wsgi_ctx_data_free(void *data)
Py_XDECREF(pctx->start_resp);
Py_XDECREF(pctx->write);
+ Py_XDECREF(pctx->environ);
Py_XDECREF(pctx);
}
@@ -295,6 +303,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req)
int rc;
PyObject *environ, *args, *response, *iterator, *item;
PyObject *close, *result;
+ nxt_bool_t prepare_environ;
nxt_python_ctx_t *pctx;
pctx = req->ctx->data;
@@ -305,6 +314,19 @@ nxt_python_request_handler(nxt_unit_request_info_t *req)
PyEval_RestoreThread(pctx->thread_state);
+ if (nxt_slow_path(pctx->environ == NULL)) {
+ pctx->environ = nxt_python_copy_environ(req);
+
+ if (pctx->environ == NULL) {
+ prepare_environ = 0;
+
+ rc = NXT_UNIT_ERROR;
+ goto done;
+ }
+ }
+
+ prepare_environ = 1;
+
environ = nxt_python_get_environ(pctx);
if (nxt_slow_path(environ == NULL)) {
rc = NXT_UNIT_ERROR;
@@ -418,6 +440,14 @@ done:
pctx->req = NULL;
nxt_unit_request_done(req, rc);
+
+ if (nxt_fast_path(prepare_environ)) {
+ PyEval_RestoreThread(pctx->thread_state);
+
+ pctx->environ = nxt_python_copy_environ(NULL);
+
+ pctx->thread_state = PyEval_SaveThread();
+ }
}
@@ -532,23 +562,30 @@ fail:
static PyObject *
-nxt_python_get_environ(nxt_python_ctx_t *pctx)
+nxt_python_copy_environ(nxt_unit_request_info_t *req)
{
- int rc;
- uint32_t i, j, vl;
- PyObject *environ;
- nxt_unit_field_t *f, *f2;
- nxt_unit_request_t *r;
+ PyObject *environ;
environ = PyDict_Copy(nxt_py_environ_ptyp);
+
if (nxt_slow_path(environ == NULL)) {
- nxt_unit_req_error(pctx->req,
+ nxt_unit_req_alert(req,
"Python failed to copy the \"environ\" dictionary");
-
- return NULL;
+ nxt_python_print_exception();
}
- pctx->environ = environ;
+ return environ;
+}
+
+
+static PyObject *
+nxt_python_get_environ(nxt_python_ctx_t *pctx)
+{
+ int rc;
+ uint32_t i, j, vl;
+ PyObject *environ;
+ nxt_unit_field_t *f, *f2;
+ nxt_unit_request_t *r;
r = pctx->req->request;
@@ -628,7 +665,7 @@ nxt_python_get_environ(nxt_python_ctx_t *pctx)
#undef RC
- if (nxt_slow_path(PyDict_SetItem(environ, nxt_py_wsgi_input_str,
+ if (nxt_slow_path(PyDict_SetItem(pctx->environ, nxt_py_wsgi_input_str,
(PyObject *) pctx) != 0))
{
nxt_unit_req_error(pctx->req,
@@ -636,11 +673,15 @@ nxt_python_get_environ(nxt_python_ctx_t *pctx)
goto fail;
}
+ environ = pctx->environ;
+ pctx->environ = NULL;
+
return environ;
fail:
- Py_DECREF(environ);
+ Py_DECREF(pctx->environ);
+ pctx->environ = NULL;
return NULL;
}
diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c
index 698d4a43..0aad887d 100644
--- a/src/ruby/nxt_ruby.c
+++ b/src/ruby/nxt_ruby.c
@@ -38,6 +38,7 @@ static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx);
static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
static void *nxt_ruby_request_handler_gvl(void *req);
static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx);
+static void *nxt_ruby_thread_create_gvl(void *rctx);
static VALUE nxt_ruby_thread_func(VALUE arg);
static void *nxt_ruby_unit_run(void *ctx);
static void nxt_ruby_ubf(void *ctx);
@@ -1141,7 +1142,7 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
rctx->ctx = ctx;
- res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
+ res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx);
if (nxt_fast_path(res != Qnil)) {
nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
@@ -1159,6 +1160,17 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
}
+static void *
+nxt_ruby_thread_create_gvl(void *rctx)
+{
+ VALUE res;
+
+ res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
+
+ return (void *) (uintptr_t) res;
+}
+
+
static VALUE
nxt_ruby_thread_func(VALUE arg)
{