diff options
author | Andrei Belov <defan@nginx.com> | 2019-12-26 17:52:09 +0300 |
---|---|---|
committer | Andrei Belov <defan@nginx.com> | 2019-12-26 17:52:09 +0300 |
commit | 35ff5ee1e82a03e57d625230173a84c829c13257 (patch) | |
tree | c3dce5e8d50c8da9739f23b41a636931ad562e25 /src | |
parent | 0ec222bbb202194327c2e76d48f0b2608b37c162 (diff) | |
parent | 55f8e31ed70910ef07db31d7f3c53b12774180f9 (diff) | |
download | unit-35ff5ee1e82a03e57d625230173a84c829c13257.tar.gz unit-35ff5ee1e82a03e57d625230173a84c829c13257.tar.bz2 |
Merged with the default branch.1.14.0-1
Diffstat (limited to '')
48 files changed, 3778 insertions, 787 deletions
diff --git a/src/go/unit/ldflags-lrt.go b/go/ldflags-lrt.go index f5a63508..f5a63508 100644 --- a/src/go/unit/ldflags-lrt.go +++ b/go/ldflags-lrt.go diff --git a/src/go/unit/nxt_cgo_lib.c b/go/nxt_cgo_lib.c index 5cb31b5a..a4fef9ea 100644 --- a/src/go/unit/nxt_cgo_lib.c +++ b/go/nxt_cgo_lib.c @@ -19,6 +19,7 @@ static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size); +static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx); int nxt_cgo_run(uintptr_t handler) @@ -34,6 +35,7 @@ nxt_cgo_run(uintptr_t handler) init.callbacks.remove_port = nxt_cgo_remove_port; init.callbacks.port_send = nxt_cgo_port_send; init.callbacks.port_recv = nxt_cgo_port_recv; + init.callbacks.shm_ack_handler = nxt_cgo_shm_ack_handler; init.data = (void *) handler; @@ -137,6 +139,13 @@ nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } +static void +nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + return nxt_go_shm_ack_handler(); +} + + int nxt_cgo_response_create(uintptr_t req, int status, int fields, uint32_t fields_size) @@ -166,15 +175,8 @@ nxt_cgo_response_send(uintptr_t req) ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len) { - int rc; - - rc = nxt_unit_response_write((nxt_unit_request_info_t *) req, - (void *) start, len); - if (rc != NXT_UNIT_OK) { - return -1; - } - - return len; + return nxt_unit_response_write_nb((nxt_unit_request_info_t *) req, + (void *) start, len, 0); } diff --git a/src/go/unit/nxt_cgo_lib.h b/go/nxt_cgo_lib.h index 5317380b..5317380b 100644 --- a/src/go/unit/nxt_cgo_lib.h +++ b/go/nxt_cgo_lib.h diff --git a/src/go/unit/port.go b/go/port.go index a68cae74..a68cae74 100644 --- a/src/go/unit/port.go +++ b/go/port.go diff --git a/src/go/unit/request.go b/go/request.go index 1d8c6702..1d8c6702 100644 --- a/src/go/unit/request.go +++ b/go/request.go diff --git a/src/go/unit/response.go b/go/response.go index 767d66b7..bfa79656 100644 --- a/src/go/unit/response.go +++ b/go/response.go @@ -19,6 +19,7 @@ type response struct { headerSent bool req *http.Request c_req C.uintptr_t + ch chan int } func new_response(c_req C.uintptr_t, req *http.Request) *response { @@ -40,8 +41,26 @@ func (r *response) Write(p []byte) (n int, err error) { r.WriteHeader(http.StatusOK) } - res := C.nxt_cgo_response_write(r.c_req, buf_ref(p), C.uint32_t(len(p))) - return int(res), nil + l := len(p) + written := int(0) + br := buf_ref(p) + + for written < l { + res := C.nxt_cgo_response_write(r.c_req, br, C.uint32_t(l - written)) + + written += int(res) + br += C.uintptr_t(res) + + if (written < l) { + if r.ch == nil { + r.ch = make(chan int, 2) + } + + wait_shm_ack(r.ch) + } + } + + return written, nil } func (r *response) WriteHeader(code int) { @@ -85,3 +104,16 @@ func (r *response) Flush() { r.WriteHeader(http.StatusOK) } } + +var observer_registry_ observable + +func wait_shm_ack(c chan int) { + observer_registry_.attach(c) + + _ = <-c +} + +//export nxt_go_shm_ack_handler +func nxt_go_shm_ack_handler() { + observer_registry_.notify(1) +} diff --git a/src/go/unit/unit.go b/go/unit.go index 1534479e..1534479e 100644 --- a/src/go/unit/unit.go +++ b/go/unit.go diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index c42149a5..2f324329 100644 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -227,6 +227,7 @@ ServerResponse.prototype._write = unit_lib.response_write; ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { var contentLength = 0; + var res, o; this._sendHeaders(); @@ -247,11 +248,32 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { if (typeof chunk === 'string') { contentLength = Buffer.byteLength(chunk, encoding); + if (contentLength > unit_lib.buf_min) { + chunk = Buffer.from(chunk, encoding); + + contentLength = chunk.length; + } + } else { contentLength = chunk.length; } - this._write(chunk, contentLength); + if (this.server._output.length > 0 || !this.socket.writable) { + o = new BufferedOutput(this, 0, chunk, encoding, callback); + this.server._output.push(o); + + return false; + } + + res = this._write(chunk, 0, contentLength); + if (res < contentLength) { + this.socket.writable = false; + + o = new BufferedOutput(this, res, chunk, encoding, callback); + this.server._output.push(o); + + return false; + } } if (typeof callback === 'function') { @@ -265,29 +287,48 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { * the event loop. All callbacks passed to process.nextTick() * will be resolved before the event loop continues. */ - process.nextTick(function () { - callback(this); - }.bind(this)); + process.nextTick(callback); } + + return true; }; ServerResponse.prototype.write = function write(chunk, encoding, callback) { if (this.finished) { - throw new Error("Write after end"); - } + if (typeof encoding === 'function') { + callback = encoding; + encoding = null; + } - this._writeBody(chunk, encoding, callback); + var err = new Error("Write after end"); + process.nextTick(() => { + this.emit('error', err); - return true; + if (typeof callback === 'function') { + callback(err); + } + }) + } + + return this._writeBody(chunk, encoding, callback); }; ServerResponse.prototype._end = unit_lib.response_end; ServerResponse.prototype.end = function end(chunk, encoding, callback) { if (!this.finished) { - this._writeBody(chunk, encoding, callback); + if (typeof encoding === 'function') { + callback = encoding; + encoding = null; + } - this._end(); + this._writeBody(chunk, encoding, () => { + this._end(); + + if (typeof callback === 'function') { + callback(); + } + }); this.finished = true; } @@ -393,6 +434,9 @@ function Server(requestListener) { this._upgradeListenerCount--; } }); + + this._output = []; + this._drain_resp = new Set(); } util.inherits(Server, EventEmitter); @@ -429,6 +473,63 @@ Server.prototype.emit_close = function () { this.emit('close'); }; +Server.prototype.emit_drain = function () { + var res, o, l; + + if (this._output.length <= 0) { + return; + } + + while (this._output.length > 0) { + o = this._output[0]; + + if (typeof o.chunk === 'string') { + l = Buffer.byteLength(o.chunk, o.encoding); + + } else { + l = o.chunk.length; + } + + res = o.resp._write(o.chunk, o.offset, l); + + o.offset += res; + if (o.offset < l) { + return; + } + + this._drain_resp.add(o.resp); + + if (typeof o.callback === 'function') { + process.nextTick(o.callback); + } + + this._output.shift(); + } + + for (var resp of this._drain_resp) { + + if (resp.socket.writable) { + continue; + } + + resp.socket.writable = true; + + process.nextTick(() => { + resp.emit("drain"); + }); + } + + this._drain_resp.clear(); +}; + +function BufferedOutput(resp, offset, chunk, encoding, callback) { + this.resp = resp; + this.offset = offset; + this.chunk = chunk; + this.encoding = encoding; + this.callback = callback; +} + function connectionListener(socket) { } diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 10875703..1fa73689 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -70,6 +70,8 @@ Unit::init(napi_env env, napi_value exports) websocket_send_frame); napi.set_named_property(exports, "websocket_set_sock", websocket_set_sock); + napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); + napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); } catch (exception &e) { napi.throw_error(e); @@ -148,6 +150,7 @@ Unit::create_server(napi_env env, napi_callback_info info) unit_init.callbacks.request_handler = request_handler_cb; unit_init.callbacks.websocket_handler = websocket_handler_cb; unit_init.callbacks.close_handler = close_handler_cb; + unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; unit_init.callbacks.add_port = add_port; unit_init.callbacks.remove_port = remove_port; unit_init.callbacks.quit = quit_cb; @@ -308,6 +311,40 @@ Unit::close_handler(nxt_unit_request_info_t *req) } +void +Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) +{ + Unit *obj; + + obj = reinterpret_cast<Unit *>(ctx->unit->data); + + obj->shm_ack_handler(ctx); +} + + +void +Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + napi_value server_obj, emit_drain; + + try { + nxt_handle_scope scope(env()); + + server_obj = get_server_object(); + + emit_drain = get_named_property(server_obj, "emit_drain"); + + nxt_async_context async_context(env(), "shm_ack_handler"); + nxt_callback_scope async_scope(async_context); + + make_callback(async_context, server_obj, emit_drain); + + } catch (exception &e) { + nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); + } +} + + static void nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { @@ -748,47 +785,68 @@ Unit::response_write(napi_env env, napi_callback_info info) int ret; void *ptr; size_t argc, have_buf_len; - uint32_t buf_len; + ssize_t res_len; + uint32_t buf_start, buf_len; nxt_napi napi(env); napi_value this_arg; nxt_unit_buf_t *buf; napi_valuetype buf_type; nxt_unit_request_info_t *req; - napi_value argv[2]; + napi_value argv[3]; - argc = 2; + argc = 3; try { this_arg = napi.get_cb_info(info, argc, argv); - if (argc != 2) { + if (argc != 3) { throw exception("Wrong args count. Expected: " - "chunk, chunk length"); + "chunk, start, length"); } req = napi.get_request_info(this_arg); buf_type = napi.type_of(argv[0]); - buf_len = napi.get_value_uint32(argv[1]) + 1; - - buf = nxt_unit_response_buf_alloc(req, buf_len); - if (buf == NULL) { - throw exception("Failed to allocate response buffer"); - } + buf_start = napi.get_value_uint32(argv[1]); + buf_len = napi.get_value_uint32(argv[2]) + 1; if (buf_type == napi_string) { /* TODO: will work only for utf8 content-type */ + if (req->response_buf != NULL + && (req->response_buf->end - req->response_buf->free) + >= buf_len) + { + buf = req->response_buf; + + } else { + buf = nxt_unit_response_buf_alloc(req, buf_len); + if (buf == NULL) { + throw exception("Failed to allocate response buffer"); + } + } + have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, buf_len); + buf->free += have_buf_len; + + ret = nxt_unit_buf_send(buf); + if (ret == NXT_UNIT_OK) { + res_len = have_buf_len; + } + } else { ptr = napi.get_buffer_info(argv[0], have_buf_len); - memcpy(buf->free, ptr, have_buf_len); - } + if (buf_start > 0) { + ptr = ((uint8_t *) ptr) + buf_start; + have_buf_len -= buf_start; + } - buf->free += have_buf_len; + res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); + + ret = res_len < 0 ? -res_len : NXT_UNIT_OK; + } - ret = nxt_unit_buf_send(buf); if (ret != NXT_UNIT_OK) { throw exception("Failed to send body buf"); } @@ -797,7 +855,7 @@ Unit::response_write(napi_env env, napi_callback_info info) return nullptr; } - return this_arg; + return napi.create((int64_t) res_len); } diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index f5eaf9fd..18359118 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -36,6 +36,9 @@ private: static void close_handler_cb(nxt_unit_request_info_t *req); void close_handler(nxt_unit_request_info_t *req); + static void shm_ack_handler_cb(nxt_unit_ctx_t *ctx); + void shm_ack_handler(nxt_unit_ctx_t *ctx); + static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); diff --git a/src/nxt_application.h b/src/nxt_application.h index 2a1fa39e..e7177887 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -89,6 +89,9 @@ struct nxt_common_app_conf_s { nxt_conf_value_t *environment; nxt_conf_value_t *isolation; + nxt_conf_value_t *limits; + + size_t shm_limit; union { nxt_external_app_conf_t external; diff --git a/src/nxt_capability.c b/src/nxt_capability.c index 805faff6..dfa7a834 100644 --- a/src/nxt_capability.c +++ b/src/nxt_capability.c @@ -10,6 +10,16 @@ #include <linux/capability.h> #include <sys/syscall.h> + +#if (_LINUX_CAPABILITY_VERSION_3) +#define NXT_CAPABILITY_VERSION _LINUX_CAPABILITY_VERSION_3 +#elif (_LINUX_CAPABILITY_VERSION_2) +#define NXT_CAPABILITY_VERSION _LINUX_CAPABILITY_VERSION_2 +#else +#define NXT_CAPABILITY_VERSION _LINUX_CAPABILITY_VERSION +#endif + + #define nxt_capget(hdrp, datap) \ syscall(SYS_capget, hdrp, datap) #define nxt_capset(hdrp, datap) \ @@ -43,7 +53,7 @@ nxt_capability_linux_get_version() { struct __user_cap_header_struct hdr; - hdr.version = _LINUX_CAPABILITY_VERSION; + hdr.version = NXT_CAPABILITY_VERSION; hdr.pid = nxt_pid; nxt_capget(&hdr, NULL); diff --git a/src/nxt_clone.c b/src/nxt_clone.c index a2c376a3..9ee3c012 100644 --- a/src/nxt_clone.c +++ b/src/nxt_clone.c @@ -25,26 +25,18 @@ nxt_clone(nxt_int_t flags) #if (NXT_HAVE_CLONE_NEWUSER) -/* map uid 65534 to unit pid */ -#define NXT_DEFAULT_UNPRIV_MAP "65534 %d 1" - -nxt_int_t nxt_clone_proc_setgroups(nxt_task_t *task, pid_t child_pid, +nxt_int_t nxt_clone_credential_setgroups(nxt_task_t *task, pid_t child_pid, const char *str); -nxt_int_t nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, - pid_t pid, nxt_int_t defval, nxt_conf_value_t *mapobj); -nxt_int_t nxt_clone_proc_map_write(nxt_task_t *task, const char *mapfile, +nxt_int_t nxt_clone_credential_map_set(nxt_task_t *task, const char* mapfile, + pid_t pid, nxt_int_t default_container, nxt_int_t default_host, + nxt_clone_credential_map_t *map); +nxt_int_t nxt_clone_credential_map_write(nxt_task_t *task, const char *mapfile, pid_t pid, u_char *mapinfo); -typedef struct { - nxt_int_t container; - nxt_int_t host; - nxt_int_t size; -} nxt_clone_procmap_t; - - nxt_int_t -nxt_clone_proc_setgroups(nxt_task_t *task, pid_t child_pid, const char *str) +nxt_clone_credential_setgroups(nxt_task_t *task, pid_t child_pid, + const char *str) { int fd, n; u_char *p, *end; @@ -89,8 +81,8 @@ nxt_clone_proc_setgroups(nxt_task_t *task, pid_t child_pid, const char *str) nxt_int_t -nxt_clone_proc_map_write(nxt_task_t *task, const char *mapfile, pid_t pid, - u_char *mapinfo) +nxt_clone_credential_map_write(nxt_task_t *task, const char *mapfile, + pid_t pid, u_char *mapinfo) { int len, mapfd; u_char *p, *end; @@ -139,17 +131,13 @@ nxt_clone_proc_map_write(nxt_task_t *task, const char *mapfile, pid_t pid, nxt_int_t -nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, - nxt_int_t defval, nxt_conf_value_t *mapobj) +nxt_clone_credential_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, + nxt_int_t default_container, nxt_int_t default_host, + nxt_clone_credential_map_t *map) { - u_char *p, *end, *mapinfo; - nxt_int_t container, host, size; - nxt_int_t ret, len, count, i; - nxt_conf_value_t *obj, *value; - - static nxt_str_t str_cont = nxt_string("container"); - static nxt_str_t str_host = nxt_string("host"); - static nxt_str_t str_size = nxt_string("size"); + u_char *p, *end, *mapinfo; + nxt_int_t ret, len; + nxt_uint_t i; /* * uid_map one-entry size: @@ -157,44 +145,28 @@ nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, */ len = sizeof(u_char) * (10 + 10 + 10 + 2 + 1); - if (mapobj != NULL) { - count = nxt_conf_array_elements_count(mapobj); - - if (count == 0) { - goto default_map; - } - - len = len * count + 1; + if (map->size > 0) { + len = len * map->size + 1; mapinfo = nxt_malloc(len); if (nxt_slow_path(mapinfo == NULL)) { - nxt_alert(task, "failed to allocate uid_map buffer"); return NXT_ERROR; } p = mapinfo; end = mapinfo + len; - for (i = 0; i < count; i++) { - obj = nxt_conf_get_array_element(mapobj, i); + for (i = 0; i < map->size; i++) { + p = nxt_sprintf(p, end, "%d %d %d", map->map[i].container, + map->map[i].host, map->map[i].size); - value = nxt_conf_get_object_member(obj, &str_cont, NULL); - container = nxt_conf_get_integer(value); - - value = nxt_conf_get_object_member(obj, &str_host, NULL); - host = nxt_conf_get_integer(value); - - value = nxt_conf_get_object_member(obj, &str_size, NULL); - size = nxt_conf_get_integer(value); - - p = nxt_sprintf(p, end, "%d %d %d", container, host, size); if (nxt_slow_path(p == end)) { - nxt_alert(task, "write past the uid_map buffer"); + nxt_alert(task, "write past the mapinfo buffer"); nxt_free(mapinfo); return NXT_ERROR; } - if (i+1 < count) { + if (i+1 < map->size) { *p++ = '\n'; } else { @@ -203,27 +175,24 @@ nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, } } else { - -default_map: - mapinfo = nxt_malloc(len); if (nxt_slow_path(mapinfo == NULL)) { - nxt_alert(task, "failed to allocate uid_map buffer"); return NXT_ERROR; } end = mapinfo + len; - p = nxt_sprintf(mapinfo, end, NXT_DEFAULT_UNPRIV_MAP, defval); + p = nxt_sprintf(mapinfo, end, "%d %d 1", + default_container, default_host); *p = '\0'; if (nxt_slow_path(p == end)) { - nxt_alert(task, "write past the %s buffer", mapfile); + nxt_alert(task, "write past mapinfo buffer"); nxt_free(mapinfo); return NXT_ERROR; } } - ret = nxt_clone_proc_map_write(task, mapfile, pid, mapinfo); + ret = nxt_clone_credential_map_write(task, mapfile, pid, mapinfo); nxt_free(mapinfo); @@ -232,31 +201,50 @@ default_map: nxt_int_t -nxt_clone_proc_map(nxt_task_t *task, pid_t pid, nxt_process_clone_t *clone) +nxt_clone_credential_map(nxt_task_t *task, pid_t pid, + nxt_credential_t *app_creds, nxt_clone_t *clone) { nxt_int_t ret; - nxt_int_t uid, gid; + nxt_int_t default_host_uid; + nxt_int_t default_host_gid; const char *rule; nxt_runtime_t *rt; rt = task->thread->runtime; - uid = geteuid(); - gid = getegid(); - rule = rt->capabilities.setid ? "allow" : "deny"; + if (rt->capabilities.setid) { + rule = "allow"; + + /* + * By default we don't map a privileged user + */ + default_host_uid = app_creds->uid; + default_host_gid = app_creds->base_gid; + } else { + rule = "deny"; + + default_host_uid = nxt_euid; + default_host_gid = nxt_egid; + } + + ret = nxt_clone_credential_map_set(task, "uid_map", pid, app_creds->uid, + default_host_uid, + &clone->uidmap); - ret = nxt_clone_proc_map_set(task, "uid_map", pid, uid, clone->uidmap); if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } - ret = nxt_clone_proc_setgroups(task, pid, rule); + ret = nxt_clone_credential_setgroups(task, pid, rule); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to write /proc/%d/setgroups", pid); return NXT_ERROR; } - ret = nxt_clone_proc_map_set(task, "gid_map", pid, gid, clone->gidmap); + ret = nxt_clone_credential_map_set(task, "gid_map", pid, app_creds->base_gid, + default_host_gid, + &clone->gidmap); + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -264,4 +252,197 @@ nxt_clone_proc_map(nxt_task_t *task, pid_t pid, nxt_process_clone_t *clone) return NXT_OK; } + +nxt_int_t +nxt_clone_vldt_credential_uidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds) +{ + nxt_int_t id; + nxt_uint_t i; + nxt_runtime_t *rt; + nxt_clone_map_entry_t m; + + if (map->size == 0) { + return NXT_OK; + } + + rt = task->thread->runtime; + + if (!rt->capabilities.setid) { + if (nxt_slow_path(map->size > 1)) { + nxt_log(task, NXT_LOG_NOTICE, "\"uidmap\" field has %d entries " + "but unprivileged unit has a maximum of 1 map.", + map->size); + + return NXT_ERROR; + } + + id = map->map[0].host; + + if (nxt_slow_path((nxt_uid_t) id != nxt_euid)) { + nxt_log(task, NXT_LOG_NOTICE, "\"uidmap\" field has an entry for " + "host uid %d but unprivileged unit can only map itself " + "(uid %d) into child namespaces.", id, nxt_euid); + + return NXT_ERROR; + } + + return NXT_OK; + } + + for (i = 0; i < map->size; i++) { + m = map->map[i]; + + if (creds->uid >= (nxt_uid_t) m.container + && creds->uid < (nxt_uid_t) (m.container + m.size)) + { + return NXT_OK; + } + } + + nxt_log(task, NXT_LOG_NOTICE, "\"uidmap\" field has no \"container\" " + "entry for user \"%s\" (uid %d)", creds->user, creds->uid); + + return NXT_ERROR; +} + + +nxt_int_t +nxt_clone_vldt_credential_gidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds) +{ + nxt_uint_t base_ok, gid_ok, gids_ok; + nxt_uint_t i, j; + nxt_runtime_t *rt; + nxt_clone_map_entry_t m; + + rt = task->thread->runtime; + + if (!rt->capabilities.setid) { + if (creds->ngroups > 0 + && !(creds->ngroups == 1 && creds->gids[0] == creds->base_gid)) { + nxt_log(task, NXT_LOG_NOTICE, + "unprivileged unit disallow supplementary groups for " + "new namespace (user \"%s\" has %d group%s).", + creds->user, creds->ngroups, + creds->ngroups > 1 ? "s" : ""); + + return NXT_ERROR; + } + + if (map->size == 0) { + return NXT_OK; + } + + if (nxt_slow_path(map->size > 1)) { + nxt_log(task, NXT_LOG_NOTICE, "\"gidmap\" field has %d entries " + "but unprivileged unit has a maximum of 1 map.", + map->size); + + return NXT_ERROR; + } + + m = map->map[0]; + + if (nxt_slow_path((nxt_gid_t) m.host != nxt_egid)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has an entry for " + "host gid %d but unprivileged unit can only map itself " + "(gid %d) into child namespaces.", m.host, nxt_egid); + + return NXT_ERROR; + } + + if (nxt_slow_path(m.size > 1)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has an entry with " + "\"size\": %d, but for unprivileged unit it must be 1.", + m.size); + + return NXT_ERROR; + } + + if (nxt_slow_path((nxt_gid_t) m.container != creds->base_gid)) { + nxt_log(task, NXT_LOG_ERR, + "\"gidmap\" field has no \"container\" entry for gid %d.", + creds->base_gid); + + return NXT_ERROR; + } + + return NXT_OK; + } + + if (map->size == 0) { + if (creds->ngroups > 0 + && !(creds->ngroups == 1 && creds->gids[0] == creds->base_gid)) + { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has no entries " + "but user \"%s\" has %d suplementary group%s.", + creds->user, creds->ngroups, + creds->ngroups > 1 ? "s" : ""); + + return NXT_ERROR; + } + + return NXT_OK; + } + + base_ok = 0; + gids_ok = 0; + + for (i = 0; i < creds->ngroups; i++) { + gid_ok = 0; + + for (j = 0; j < map->size; j++) { + m = map->map[j]; + + if (!base_ok && creds->base_gid >= (nxt_gid_t) m.container + && creds->base_gid < (nxt_gid_t) (m.container+m.size)) + { + base_ok = 1; + } + + if (creds->gids[i] >= (nxt_gid_t) m.container + && creds->gids[i] < (nxt_gid_t) (m.container+m.size)) + { + gid_ok = 1; + break; + } + } + + if (nxt_fast_path(gid_ok)) { + gids_ok++; + } + } + + if (!base_ok) { + for (i = 0; i < map->size; i++) { + m = map->map[i]; + + if (creds->base_gid >= (nxt_gid_t) m.container + && creds->base_gid < (nxt_gid_t) (m.container+m.size)) + { + base_ok = 1; + break; + } + } + } + + if (nxt_slow_path(!base_ok)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has no \"container\" " + "entry for gid %d.", creds->base_gid); + + return NXT_ERROR; + } + + if (nxt_slow_path(gids_ok < creds->ngroups)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has missing " + "suplementary gid mappings (found %d out of %d).", gids_ok, + creds->ngroups); + + return NXT_ERROR; + } + + return NXT_OK; +} + #endif diff --git a/src/nxt_clone.h b/src/nxt_clone.h index 50dec0b4..dcccf1db 100644 --- a/src/nxt_clone.h +++ b/src/nxt_clone.h @@ -7,11 +7,47 @@ #define _NXT_CLONE_INCLUDED_ +#if (NXT_HAVE_CLONE_NEWUSER) + +typedef struct { + nxt_int_t container; + nxt_int_t host; + nxt_int_t size; +} nxt_clone_map_entry_t; + +typedef struct { + nxt_uint_t size; + nxt_clone_map_entry_t *map; +} nxt_clone_credential_map_t; + +#endif + +typedef struct { + nxt_int_t flags; + +#if (NXT_HAVE_CLONE_NEWUSER) + nxt_clone_credential_map_t uidmap; + nxt_clone_credential_map_t gidmap; +#endif + +} nxt_clone_t; + + pid_t nxt_clone(nxt_int_t flags); + #if (NXT_HAVE_CLONE_NEWUSER) -nxt_int_t nxt_clone_proc_map(nxt_task_t *task, pid_t pid, - nxt_process_clone_t *clone); + +#define NXT_CLONE_USER(flags) \ + ((flags & CLONE_NEWUSER) == CLONE_NEWUSER) + +NXT_EXPORT nxt_int_t nxt_clone_credential_map(nxt_task_t *task, pid_t pid, + nxt_credential_t *creds, nxt_clone_t *clone); +NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_uidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds); +NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_gidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds); + #endif #endif /* _NXT_CLONE_INCLUDED_ */ diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 105af675..5a1f7839 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -9,6 +9,8 @@ #include <nxt_cert.h> #include <nxt_router.h> #include <nxt_http.h> +#include <nxt_sockaddr.h> +#include <nxt_http_route_addr.h> typedef enum { @@ -82,6 +84,10 @@ static nxt_int_t nxt_conf_vldt_match_patterns_set_member( nxt_conf_validation_t *vldt, nxt_str_t *name, nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_match_scheme_pattern(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_match_addrs(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_match_addr(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_app_name(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_app(nxt_conf_validation_t *vldt, @@ -283,6 +289,16 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { &nxt_conf_vldt_match_patterns, NULL }, + { nxt_string("source"), + NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + &nxt_conf_vldt_match_addrs, + NULL }, + + { nxt_string("destination"), + NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + &nxt_conf_vldt_match_addrs, + NULL }, + { nxt_string("uri"), NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, &nxt_conf_vldt_match_patterns, @@ -358,6 +374,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = { NULL, NULL }, + { nxt_string("shm"), + NXT_CONF_VLDT_INTEGER, + NULL, + NULL }, + NXT_CONF_VLDT_END }; @@ -1150,6 +1171,63 @@ nxt_conf_vldt_match_pattern(nxt_conf_validation_t *vldt, static nxt_int_t +nxt_conf_vldt_match_addrs(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) +{ + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { + return nxt_conf_vldt_array_iterator(vldt, value, + &nxt_conf_vldt_match_addr); + } + + return nxt_conf_vldt_match_addr(vldt, value); +} + + +static nxt_int_t +nxt_conf_vldt_match_addr(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value) +{ + nxt_http_route_addr_pattern_t pattern; + + switch (nxt_http_route_addr_pattern_parse(vldt->pool, &pattern, value)) { + + case NXT_OK: + return NXT_OK; + + case NXT_ADDR_PATTERN_PORT_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" port an invalid " + "port."); + + case NXT_ADDR_PATTERN_CV_TYPE_ERROR: + return nxt_conf_vldt_error(vldt, "The \"match\" pattern for " + "\"address\" must be a string."); + + case NXT_ADDR_PATTERN_LENGTH_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" is too short."); + + case NXT_ADDR_PATTERN_FORMAT_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" format is invalid."); + + case NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" range is " + "overlapping."); + + case NXT_ADDR_PATTERN_CIDR_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" has an invalid CIDR " + "prefix."); + + case NXT_ADDR_PATTERN_NO_IPv6_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" does not support " + "IPv6 with your configuration."); + + default: + return nxt_conf_vldt_error(vldt, "The \"address\" has an unknown " + "format."); + } +} + + +static nxt_int_t nxt_conf_vldt_match_scheme_pattern(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { diff --git a/src/nxt_credential.c b/src/nxt_credential.c new file mode 100644 index 00000000..168db9cf --- /dev/null +++ b/src/nxt_credential.c @@ -0,0 +1,350 @@ +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static nxt_int_t nxt_credential_groups_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_credential_t *uc); + + +nxt_int_t +nxt_credential_get(nxt_task_t *task, nxt_mp_t *mp, nxt_credential_t *uc, + const char *group) +{ + struct group *grp; + struct passwd *pwd; + + nxt_errno = 0; + + pwd = getpwnam(uc->user); + + if (nxt_slow_path(pwd == NULL)) { + + if (nxt_errno == 0) { + nxt_alert(task, "getpwnam(\"%s\") failed, user \"%s\" not found", + uc->user, uc->user); + } else { + nxt_alert(task, "getpwnam(\"%s\") failed %E", uc->user, nxt_errno); + } + + return NXT_ERROR; + } + + uc->uid = pwd->pw_uid; + uc->base_gid = pwd->pw_gid; + + if (group != NULL && group[0] != '\0') { + nxt_errno = 0; + + grp = getgrnam(group); + + if (nxt_slow_path(grp == NULL)) { + + if (nxt_errno == 0) { + nxt_alert(task, + "getgrnam(\"%s\") failed, group \"%s\" not found", + group, group); + } else { + nxt_alert(task, "getgrnam(\"%s\") failed %E", group, nxt_errno); + } + + return NXT_ERROR; + } + + uc->base_gid = grp->gr_gid; + } + + nxt_debug(task, "about to get \"%s\" groups (uid:%d, base gid:%d)", + uc->user, uc->uid, uc->base_gid); + + if (nxt_credential_groups_get(task, mp, uc) != NXT_OK) { + return NXT_ERROR; + } + +#if (NXT_DEBUG) + { + u_char *p, *end; + nxt_uint_t i; + u_char msg[NXT_MAX_ERROR_STR]; + + p = msg; + end = msg + NXT_MAX_ERROR_STR; + + for (i = 0; i < uc->ngroups; i++) { + p = nxt_sprintf(p, end, "%d%c", uc->gids[i], + i+1 < uc->ngroups ? ',' : '\0'); + } + + nxt_debug(task, "user \"%s\" has gids:%*s", uc->user, p - msg, msg); + } +#endif + + return NXT_OK; +} + + +#if (NXT_HAVE_GETGROUPLIST && !NXT_MACOSX) + +#define NXT_NGROUPS nxt_min(256, NGROUPS_MAX) + + +static nxt_int_t +nxt_credential_groups_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_credential_t *uc) +{ + int ngroups; + gid_t groups[NXT_NGROUPS]; + + ngroups = NXT_NGROUPS; + + if (getgrouplist(uc->user, uc->base_gid, groups, &ngroups) < 0) { + if (nxt_slow_path(ngroups <= NXT_NGROUPS)) { + nxt_alert(task, "getgrouplist(\"%s\", %d, ...) failed %E", uc->user, + uc->base_gid, nxt_errno); + + return NXT_ERROR; + } + } + + if (ngroups > NXT_NGROUPS) { + if (ngroups > NGROUPS_MAX) { + ngroups = NGROUPS_MAX; + } + + uc->ngroups = ngroups; + + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); + if (nxt_slow_path(uc->gids == NULL)) { + return NXT_ERROR; + } + + if (nxt_slow_path(getgrouplist(uc->user, uc->base_gid, uc->gids, + &ngroups) < 0)) { + + nxt_alert(task, "getgrouplist(\"%s\", %d) failed %E", uc->user, + uc->base_gid, nxt_errno); + + return NXT_ERROR; + } + + return NXT_OK; + } + + uc->ngroups = ngroups; + + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); + if (nxt_slow_path(uc->gids == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(uc->gids, groups, ngroups * sizeof(gid_t)); + + return NXT_OK; +} + + +#else + +/* + * For operating systems that lack getgrouplist(3) or it's buggy (MacOS), + * nxt_credential_groups_get() stores an array of groups IDs which should be + * set by the setgroups() function for a given user. The initgroups() + * may block a just forked worker process for some time if LDAP or NDIS+ + * is used, so nxt_credential_groups_get() allows to get worker user groups in + * main process. In a nutshell the initgroups() calls getgrouplist() + * followed by setgroups(). However older Solaris lacks the getgrouplist(). + * Besides getgrouplist() does not allow to query the exact number of + * groups in some platforms, while NGROUPS_MAX can be quite large (e.g. + * 65536 on Linux). + * So nxt_credential_groups_get() emulates getgrouplist(): at first the + * function saves the super-user groups IDs, then calls initgroups() and saves + * the specified user groups IDs, and then restores the super-user groups IDs. + * This works at least on Linux, FreeBSD, and Solaris, but does not work + * on MacOSX, getgroups(2): + * + * To provide compatibility with applications that use getgroups() in + * environments where users may be in more than {NGROUPS_MAX} groups, + * a variant of getgroups(), obtained when compiling with either the + * macros _DARWIN_UNLIMITED_GETGROUPS or _DARWIN_C_SOURCE defined, can + * be used that is not limited to {NGROUPS_MAX} groups. However, this + * variant only returns the user's default group access list and not + * the group list modified by a call to setgroups(2). + * + * For such cases initgroups() is used in worker process as fallback. + */ + +static nxt_int_t +nxt_credential_groups_get(nxt_task_t *task, nxt_mp_t *mp, nxt_credential_t *uc) +{ + int nsaved, ngroups; + nxt_int_t ret; + nxt_gid_t *saved; + + nsaved = getgroups(0, NULL); + + if (nxt_slow_path(nsaved == -1)) { + nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); + return NXT_ERROR; + } + + nxt_debug(task, "getgroups(0, NULL): %d", nsaved); + + if (nsaved > NGROUPS_MAX) { + /* MacOSX case. */ + + uc->gids = NULL; + uc->ngroups = 0; + + return NXT_OK; + } + + saved = nxt_mp_alloc(mp, nsaved * sizeof(nxt_gid_t)); + + if (nxt_slow_path(saved == NULL)) { + return NXT_ERROR; + } + + ret = NXT_ERROR; + + nsaved = getgroups(nsaved, saved); + + if (nxt_slow_path(nsaved == -1)) { + nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno); + goto free; + } + + nxt_debug(task, "getgroups(): %d", nsaved); + + if (initgroups(uc->user, uc->base_gid) != 0) { + if (nxt_errno == NXT_EPERM) { + nxt_log(task, NXT_LOG_NOTICE, + "initgroups(%s, %d) failed %E, ignored", + uc->user, uc->base_gid, nxt_errno); + + ret = NXT_OK; + + goto free; + + } else { + nxt_alert(task, "initgroups(%s, %d) failed %E", + uc->user, uc->base_gid, nxt_errno); + goto restore; + } + } + + ngroups = getgroups(0, NULL); + + if (nxt_slow_path(ngroups == -1)) { + nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); + goto restore; + } + + nxt_debug(task, "getgroups(0, NULL): %d", ngroups); + + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(nxt_gid_t)); + + if (nxt_slow_path(uc->gids == NULL)) { + goto restore; + } + + ngroups = getgroups(ngroups, uc->gids); + + if (nxt_slow_path(ngroups == -1)) { + nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno); + goto restore; + } + + uc->ngroups = ngroups; + + ret = NXT_OK; + +restore: + + if (nxt_slow_path(setgroups(nsaved, saved) != 0)) { + nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno); + ret = NXT_ERROR; + } + +free: + + nxt_mp_free(mp, saved); + + return ret; +} + + +#endif + + +nxt_int_t +nxt_credential_setuid(nxt_task_t *task, nxt_credential_t *uc) +{ + nxt_debug(task, "user cred set: \"%s\" uid:%d", uc->user, uc->uid); + + if (setuid(uc->uid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " + "valid in the application namespace.", uc->uid, uc->user); + return NXT_ERROR; + } +#endif + + nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); + return NXT_ERROR; + } + + return NXT_OK; +} + + +nxt_int_t +nxt_credential_setgids(nxt_task_t *task, nxt_credential_t *uc) +{ + nxt_runtime_t *rt; + + nxt_debug(task, "user cred set gids: base gid:%d, ngroups: %d", + uc->base_gid, uc->ngroups); + + rt = task->thread->runtime; + + if (setgid(uc->base_gid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " + "application namespace.", uc->base_gid); + return NXT_ERROR; + } +#endif + + nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); + return NXT_ERROR; + } + + if (!rt->capabilities.setid) { + return NXT_OK; + } + + if (nxt_slow_path(uc->ngroups > 0 + && setgroups(uc->ngroups, uc->gids) != 0)) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " + "supplementary group ids not valid in the application " + "namespace.", uc->user, uc->uid); + return NXT_ERROR; + } +#endif + + nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); + return NXT_ERROR; + } + + return NXT_OK; +} diff --git a/src/nxt_credential.h b/src/nxt_credential.h new file mode 100644 index 00000000..243eba83 --- /dev/null +++ b/src/nxt_credential.h @@ -0,0 +1,30 @@ +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_CREDENTIAL_H_INCLUDED_ +#define _NXT_CREDENTIAL_H_INCLUDED_ + + +typedef uid_t nxt_uid_t; +typedef gid_t nxt_gid_t; + +typedef struct { + const char *user; + nxt_uid_t uid; + nxt_gid_t base_gid; + nxt_uint_t ngroups; + nxt_gid_t *gids; +} nxt_credential_t; + + +NXT_EXPORT nxt_int_t nxt_credential_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_credential_t *uc, const char *group); +NXT_EXPORT nxt_int_t nxt_credential_setuid(nxt_task_t *task, + nxt_credential_t *uc); +NXT_EXPORT nxt_int_t nxt_credential_setgids(nxt_task_t *task, + nxt_credential_t *uc); + + +#endif /* _NXT_CREDENTIAL_H_INCLUDED_ */ diff --git a/src/nxt_external.c b/src/nxt_external.c index 89fe08c8..e498a938 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -98,11 +98,11 @@ nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf) "%s;%uD;" "%PI,%ud,%d;" "%PI,%ud,%d;" - "%d,%Z", + "%d,%z,%Z", NXT_VERSION, my_port->process->init->stream, main_port->pid, main_port->id, main_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], - 2); + 2, conf->shm_limit); if (nxt_slow_path(p == end)) { nxt_alert(task, "internal error: buffer too small for NXT_UNIT_INIT"); diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index b07eaf84..8ce57893 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -1230,6 +1230,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, c = h1p->conn; c->write = header; + h1p->conn_write_tail = &header->next; c->write_state = &nxt_h1p_request_send_state; if (body_handler != NULL) { @@ -1342,8 +1343,14 @@ nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) nxt_conn_write(task->thread->engine, c); } else { - nxt_buf_chain_add(&c->write, out); + *h1p->conn_write_tail = out; } + + while (out->next != NULL) { + out = out->next; + } + + h1p->conn_write_tail = &out->next; } @@ -1730,9 +1737,10 @@ nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data) static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c) { - u_char *p; - size_t size; - nxt_buf_t *out, *last; + u_char *p; + size_t size; + nxt_buf_t *out, *last; + nxt_h1proto_t *h1p; size = nxt_length(NXT_H1P_IDLE_TIMEOUT) + nxt_http_date_cache.size @@ -1762,6 +1770,9 @@ nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c) last->completion_handler = nxt_h1p_idle_response_sent; last->parent = c; + h1p = c->socket.data; + h1p->conn_write_tail = &last->next; + c->write = out; c->write_state = &nxt_h1p_timeout_response_state; diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h index 61da6770..3294713f 100644 --- a/src/nxt_h1proto.h +++ b/src/nxt_h1proto.h @@ -40,6 +40,8 @@ struct nxt_h1proto_s { nxt_http_request_t *request; nxt_buf_t *buffers; + + nxt_buf_t **conn_write_tail; /* * All fields before the conn field will * be zeroed in a keep-alive connection. diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index 18b352ea..ef9593b7 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -6,6 +6,8 @@ #include <nxt_router.h> #include <nxt_http.h> +#include <nxt_sockaddr.h> +#include <nxt_http_route_addr.h> typedef enum { @@ -16,6 +18,8 @@ typedef enum { NXT_HTTP_ROUTE_ARGUMENT, NXT_HTTP_ROUTE_COOKIE, NXT_HTTP_ROUTE_SCHEME, + NXT_HTTP_ROUTE_SOURCE, + NXT_HTTP_ROUTE_DESTINATION, } nxt_http_route_object_t; @@ -50,6 +54,8 @@ typedef struct { nxt_conf_value_t *arguments; nxt_conf_value_t *cookies; nxt_conf_value_t *scheme; + nxt_conf_value_t *source; + nxt_conf_value_t *destination; } nxt_http_route_match_conf_t; @@ -118,9 +124,18 @@ typedef struct { } nxt_http_route_table_t; +typedef struct { + /* The object must be the first field. */ + nxt_http_route_object_t object:8; + uint32_t items; + nxt_http_route_addr_pattern_t addr_pattern[0]; +} nxt_http_route_addr_rule_t; + + typedef union { nxt_http_route_rule_t *rule; nxt_http_route_table_t *table; + nxt_http_route_addr_rule_t *addr_rule; } nxt_http_route_test_t; @@ -170,6 +185,8 @@ static nxt_http_route_ruleset_t *nxt_http_route_ruleset_create(nxt_task_t *task, static nxt_http_route_rule_t *nxt_http_route_rule_name_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *rule_cv, nxt_str_t *name, nxt_bool_t case_sensitive); +static nxt_http_route_addr_rule_t *nxt_http_route_addr_rule_create( + nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *cv); static nxt_http_route_rule_t *nxt_http_route_rule_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *cv, nxt_bool_t case_sensitive, nxt_http_route_pattern_case_t pattern_case); @@ -190,12 +207,14 @@ static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes); static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *start); -static nxt_http_action_t *nxt_http_route_match(nxt_http_request_t *r, - nxt_http_route_match_t *match); +static nxt_http_action_t *nxt_http_route_match(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_route_match_t *match); static nxt_int_t nxt_http_route_table(nxt_http_request_t *r, nxt_http_route_table_t *table); static nxt_int_t nxt_http_route_ruleset(nxt_http_request_t *r, nxt_http_route_ruleset_t *ruleset); +static nxt_int_t nxt_http_route_addr_rule(nxt_http_request_t *r, + nxt_http_route_addr_rule_t *addr_rule, nxt_sockaddr_t *sockaddr); static nxt_int_t nxt_http_route_rule(nxt_http_request_t *r, nxt_http_route_rule_t *rule); static nxt_int_t nxt_http_route_header(nxt_http_request_t *r, @@ -329,6 +348,18 @@ static nxt_conf_map_t nxt_http_route_match_conf[] = { NXT_CONF_MAP_PTR, offsetof(nxt_http_route_match_conf_t, cookies), }, + + { + nxt_string("source"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_match_conf_t, source), + }, + + { + nxt_string("destination"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_match_conf_t, destination), + }, }; @@ -381,6 +412,7 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_rule_t *rule; nxt_http_route_table_t *table; nxt_http_route_match_t *match; + nxt_http_route_addr_rule_t *addr_rule; nxt_http_route_match_conf_t mtcf; static nxt_str_t match_path = nxt_string("/match"); @@ -505,6 +537,28 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, test++; } + if (mtcf.source != NULL) { + addr_rule = nxt_http_route_addr_rule_create(task, mp, mtcf.source); + if (addr_rule == NULL) { + return NULL; + } + + addr_rule->object = NXT_HTTP_ROUTE_SOURCE; + test->addr_rule = addr_rule; + test++; + } + + if (mtcf.destination != NULL) { + addr_rule = nxt_http_route_addr_rule_create(task, mp, mtcf.destination); + if (addr_rule == NULL) { + return NULL; + } + + addr_rule->object = NXT_HTTP_ROUTE_DESTINATION; + test->addr_rule = addr_rule; + test++; + } + return match; } @@ -770,6 +824,53 @@ nxt_http_route_rule_create(nxt_task_t *task, nxt_mp_t *mp, } +static nxt_http_route_addr_rule_t * +nxt_http_route_addr_rule_create(nxt_task_t *task, nxt_mp_t *mp, + nxt_conf_value_t *cv) +{ + size_t size; + uint32_t i, n; + nxt_bool_t array; + nxt_conf_value_t *value; + nxt_http_route_addr_rule_t *addr_rule; + nxt_http_route_addr_pattern_t *pattern; + + array = (nxt_conf_type(cv) == NXT_CONF_ARRAY); + n = array ? nxt_conf_array_elements_count(cv) : 1; + + size = sizeof(nxt_http_route_addr_rule_t) + + n * sizeof(nxt_http_route_addr_pattern_t); + + addr_rule = nxt_mp_alloc(mp, size); + if (nxt_slow_path(addr_rule == NULL)) { + return NULL; + } + + addr_rule->items = n; + + if (!array) { + pattern = &addr_rule->addr_pattern[0]; + + if (nxt_http_route_addr_pattern_parse(mp, pattern, cv) != NXT_OK) { + return NULL; + } + + return addr_rule; + } + + for (i = 0; i < n; i++) { + pattern = &addr_rule->addr_pattern[i]; + value = nxt_conf_get_array_element(cv, i); + + if (nxt_http_route_addr_pattern_parse(mp, pattern, value) != NXT_OK) { + return NULL; + } + } + + return addr_rule; +} + + static int nxt_http_pattern_compare(const void *one, const void *two) { @@ -1117,7 +1218,7 @@ nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, end = match + route->items; while (match < end) { - action = nxt_http_route_match(r, *match); + action = nxt_http_route_match(task, r, *match); if (action != NULL) { return action; } @@ -1132,7 +1233,8 @@ nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, static nxt_http_action_t * -nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) +nxt_http_route_match(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_route_match_t *match) { nxt_int_t ret; nxt_http_route_test_t *test, *end; @@ -1141,11 +1243,23 @@ nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) end = test + match->items; while (test < end) { - if (test->rule->object != NXT_HTTP_ROUTE_TABLE) { - ret = nxt_http_route_rule(r, test->rule); - - } else { + switch (test->rule->object) { + case NXT_HTTP_ROUTE_TABLE: ret = nxt_http_route_table(r, test->table); + break; + case NXT_HTTP_ROUTE_SOURCE: + ret = nxt_http_route_addr_rule(r, test->addr_rule, r->remote); + break; + case NXT_HTTP_ROUTE_DESTINATION: + if (r->local == NULL && nxt_fast_path(r->proto.any != NULL)) { + nxt_http_proto[r->protocol].local_addr(task, r); + } + + ret = nxt_http_route_addr_rule(r, test->addr_rule, r->local); + break; + default: + ret = nxt_http_route_rule(r, test->rule); + break; } if (ret <= 0) { @@ -1256,6 +1370,154 @@ nxt_http_route_rule(nxt_http_request_t *r, nxt_http_route_rule_t *rule) static nxt_int_t +nxt_http_route_addr_pattern_match(nxt_http_route_addr_pattern_t *p, + nxt_sockaddr_t *sa) +{ +#if (NXT_INET6) + uint32_t i; +#endif + in_port_t in_port; + nxt_int_t match; + struct sockaddr_in *sin; +#if (NXT_INET6) + struct sockaddr_in6 *sin6; +#endif + nxt_http_route_addr_base_t *base; + + base = &p->base; + + switch (sa->u.sockaddr.sa_family) { + + case AF_INET: + + match = (base->addr_family == AF_INET + || base->addr_family == AF_UNSPEC); + if (!match) { + break; + } + + sin = &sa->u.sockaddr_in; + in_port = ntohs(sin->sin_port); + + match = (in_port >= base->port.start && in_port <= base->port.end); + if (!match) { + break; + } + + switch (base->match_type) { + + case NXT_HTTP_ROUTE_ADDR_ANY: + break; + + case NXT_HTTP_ROUTE_ADDR_EXACT: + match = (nxt_memcmp(&sin->sin_addr, &p->addr.v4.start, + sizeof(struct in_addr)) + == 0); + break; + + case NXT_HTTP_ROUTE_ADDR_RANGE: + match = (nxt_memcmp(&sin->sin_addr, &p->addr.v4.start, + sizeof(struct in_addr)) >= 0 + && nxt_memcmp(&sin->sin_addr, &p->addr.v4.end, + sizeof(struct in_addr)) <= 0); + break; + + case NXT_HTTP_ROUTE_ADDR_CIDR: + match = ((sin->sin_addr.s_addr & p->addr.v4.end) + == p->addr.v4.start); + break; + + default: + nxt_unreachable(); + } + + break; + +#if (NXT_INET6) + case AF_INET6: + + match = (base->addr_family == AF_INET6 + || base->addr_family == AF_UNSPEC); + if (!match) { + break; + } + + sin6 = &sa->u.sockaddr_in6; + in_port = ntohs(sin6->sin6_port); + + match = (in_port >= base->port.start && in_port <= base->port.end); + if (!match) { + break; + } + + switch (base->match_type) { + + case NXT_HTTP_ROUTE_ADDR_ANY: + break; + + case NXT_HTTP_ROUTE_ADDR_EXACT: + match = (nxt_memcmp(&sin6->sin6_addr, &p->addr.v6.start, + sizeof(struct in6_addr)) + == 0); + break; + + case NXT_HTTP_ROUTE_ADDR_RANGE: + match = (nxt_memcmp(&sin6->sin6_addr, &p->addr.v6.start, + sizeof(struct in6_addr)) >= 0 + && nxt_memcmp(&sin6->sin6_addr, &p->addr.v6.end, + sizeof(struct in6_addr)) <= 0); + break; + + case NXT_HTTP_ROUTE_ADDR_CIDR: + for (i = 0; i < 16; i++) { + match = ((sin6->sin6_addr.s6_addr[i] + & p->addr.v6.end.s6_addr[i]) + == p->addr.v6.start.s6_addr[i]); + + if (!match) { + break; + } + } + + break; + + default: + nxt_unreachable(); + } + + break; +#endif + + default: + match = 0; + break; + } + + return match ^ base->negative; +} + + +static nxt_int_t +nxt_http_route_addr_rule(nxt_http_request_t *r, + nxt_http_route_addr_rule_t *addr_rule, nxt_sockaddr_t *sa) +{ + uint32_t i, n; + nxt_http_route_addr_pattern_t *p; + + n = addr_rule->items; + + for (i = 0; i < n; i++) { + p = &addr_rule->addr_pattern[i]; + if (nxt_http_route_addr_pattern_match(p, sa)) { + return 1; + } + } + + return 0; +} + + +static nxt_int_t nxt_http_route_header(nxt_http_request_t *r, nxt_http_route_rule_t *rule) { nxt_int_t ret; diff --git a/src/nxt_http_route_addr.c b/src/nxt_http_route_addr.c new file mode 100644 index 00000000..3c7e9c84 --- /dev/null +++ b/src/nxt_http_route_addr.c @@ -0,0 +1,349 @@ + +/* + * Copyright (C) Axel Duch + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_http_route_addr.h> + + +static nxt_bool_t nxt_str_looks_like_ipv6(const nxt_str_t *str); +#if (NXT_INET6) +static nxt_bool_t nxt_valid_ipv6_blocks(u_char *c, size_t len); +#endif + + +nxt_int_t +nxt_http_route_addr_pattern_parse(nxt_mp_t *mp, + nxt_http_route_addr_pattern_t *pattern, nxt_conf_value_t *cv) +{ + u_char *delim, *end; + nxt_int_t ret, cidr_prefix; + nxt_str_t addr, port; + nxt_http_route_addr_base_t *base; + nxt_http_route_addr_range_t *inet; + + if (nxt_conf_type(cv) != NXT_CONF_STRING) { + return NXT_ADDR_PATTERN_CV_TYPE_ERROR; + } + + nxt_conf_get_string(cv, &addr); + + base = &pattern->base; + + if (addr.length > 0 && addr.start[0] == '!') { + addr.start++; + addr.length--; + + base->negative = 1; + + } else { + base->negative = 0; + } + + if (nxt_slow_path(addr.length < 2)) { + return NXT_ADDR_PATTERN_LENGTH_ERROR; + } + + nxt_str_null(&port); + + if (addr.start[0] == '*' && addr.start[1] == ':') { + port.start = addr.start + 2; + port.length = addr.length - 2; + base->addr_family = AF_UNSPEC; + base->match_type = NXT_HTTP_ROUTE_ADDR_ANY; + + goto parse_port; + } + + if (nxt_str_looks_like_ipv6(&addr)) { +#if (NXT_INET6) + uint8_t i; + nxt_int_t len; + nxt_http_route_in6_addr_range_t *inet6; + + base->addr_family = AF_INET6; + + if (addr.start[0] == '[') { + addr.start++; + addr.length--; + + end = addr.start + addr.length; + + port.start = nxt_rmemstrn(addr.start, end, "]:", 2); + if (nxt_slow_path(port.start == NULL)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + addr.length = port.start - addr.start; + port.start += nxt_length("]:"); + port.length = end - port.start; + } + + inet6 = &pattern->addr.v6; + + delim = nxt_memchr(addr.start, '-', addr.length); + if (delim != NULL) { + len = delim - addr.start; + if (nxt_slow_path(!nxt_valid_ipv6_blocks(addr.start, len))) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + ret = nxt_inet6_addr(&inet6->start, addr.start, len); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + len = addr.start + addr.length - delim - 1; + if (nxt_slow_path(!nxt_valid_ipv6_blocks(delim + 1, len))) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + ret = nxt_inet6_addr(&inet6->end, delim + 1, len); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (nxt_slow_path(nxt_memcmp(&inet6->start, &inet6->end, + sizeof(struct in6_addr)) > 0)) + { + return NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_RANGE; + + goto parse_port; + } + + delim = nxt_memchr(addr.start, '/', addr.length); + if (delim != NULL) { + cidr_prefix = nxt_int_parse(delim + 1, + addr.start + addr.length - (delim + 1)); + if (nxt_slow_path(cidr_prefix < 0 || cidr_prefix > 128)) { + return NXT_ADDR_PATTERN_CIDR_ERROR; + } + + addr.length = delim - addr.start; + if (nxt_slow_path(!nxt_valid_ipv6_blocks(addr.start, + addr.length))) + { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + ret = nxt_inet6_addr(&inet6->start, addr.start, addr.length); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (nxt_slow_path(cidr_prefix == 0)) { + base->match_type = NXT_HTTP_ROUTE_ADDR_ANY; + + goto parse_port; + } + + if (nxt_slow_path(cidr_prefix == 128)) { + base->match_type = NXT_HTTP_ROUTE_ADDR_EXACT; + + goto parse_port; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_CIDR; + + for (i = 0; i < sizeof(struct in6_addr); i++) { + if (cidr_prefix >= 8) { + inet6->end.s6_addr[i] = 0xFF; + cidr_prefix -= 8; + + continue; + } + + if (cidr_prefix > 0) { + inet6->end.s6_addr[i] = 0xFF & (0xFF << (8 - cidr_prefix)); + inet6->start.s6_addr[i] &= inet6->end.s6_addr[i]; + cidr_prefix = 0; + + continue; + } + + inet6->start.s6_addr[i] = 0; + inet6->end.s6_addr[i] = 0; + } + + goto parse_port; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_EXACT; + + if (nxt_slow_path(!nxt_valid_ipv6_blocks(addr.start, addr.length))) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + nxt_inet6_addr(&inet6->start, addr.start, addr.length); + + goto parse_port; +#endif + return NXT_ADDR_PATTERN_NO_IPv6_ERROR; + } + + base->addr_family = AF_INET; + + delim = nxt_memchr(addr.start, ':', addr.length); + if (delim != NULL) { + port.start = delim + 1; + port.length = addr.start + addr.length - port.start; + addr.length = delim - addr.start; + } + + inet = &pattern->addr.v4; + + delim = nxt_memchr(addr.start, '-', addr.length); + if (delim != NULL) { + inet->start = nxt_inet_addr(addr.start, delim - addr.start); + if (nxt_slow_path(inet->start == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + inet->end = nxt_inet_addr(delim + 1, + addr.start + addr.length - (delim + 1)); + if (nxt_slow_path(inet->end == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (nxt_slow_path(nxt_memcmp(&inet->start, &inet->end, + sizeof(struct in_addr)) > 0)) + { + return NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_RANGE; + + goto parse_port; + } + + delim = nxt_memchr(addr.start, '/', addr.length); + if (delim != NULL) { + cidr_prefix = nxt_int_parse(delim + 1, + addr.start + addr.length - (delim + 1)); + if (nxt_slow_path(cidr_prefix < 0 || cidr_prefix > 32)) { + return NXT_ADDR_PATTERN_CIDR_ERROR; + } + + addr.length = delim - addr.start; + inet->end = htonl(0xFFFFFFFF & (0xFFFFFFFF << (32 - cidr_prefix))); + + inet->start = nxt_inet_addr(addr.start, addr.length) & inet->end; + if (nxt_slow_path(inet->start == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (cidr_prefix == 0) { + base->match_type = NXT_HTTP_ROUTE_ADDR_ANY; + + goto parse_port; + } + + if (cidr_prefix < 32) { + base->match_type = NXT_HTTP_ROUTE_ADDR_CIDR; + + goto parse_port; + } + } + + inet->start = nxt_inet_addr(addr.start, addr.length); + if (nxt_slow_path(inet->start == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_EXACT; + +parse_port: + + if (port.length == 0) { + if (nxt_slow_path(port.start != NULL)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + base->port.start = 0; + base->port.end = 65535; + + return NXT_OK; + } + + delim = nxt_memchr(port.start, '-', port.length - 1); + if (delim != NULL) { + ret = nxt_int_parse(port.start, delim - port.start); + if (nxt_slow_path(ret < 0 || ret > 65535)) { + return NXT_ADDR_PATTERN_PORT_ERROR; + } + + base->port.start = ret; + + ret = nxt_int_parse(delim + 1, port.start + port.length - (delim + 1)); + if (nxt_slow_path(ret < base->port.start || ret > 65535)) { + return NXT_ADDR_PATTERN_PORT_ERROR; + } + + base->port.end = ret; + + } else { + ret = nxt_int_parse(port.start, port.length); + if (nxt_slow_path(ret < 0 || ret > 65535)) { + return NXT_ADDR_PATTERN_PORT_ERROR; + } + + base->port.start = ret; + base->port.end = ret; + } + + return NXT_OK; +} + + +static nxt_bool_t +nxt_str_looks_like_ipv6(const nxt_str_t *str) +{ + u_char *colon, *end; + + colon = nxt_memchr(str->start, ':', str->length); + + if (colon != NULL) { + end = str->start + str->length; + colon = nxt_memchr(colon + 1, ':', end - (colon + 1)); + } + + return (colon != NULL); +} + + +#if (NXT_INET6) + +static nxt_bool_t +nxt_valid_ipv6_blocks(u_char *c, size_t len) +{ + u_char *end; + nxt_uint_t colon_gap; + + end = c + len; + colon_gap = 0; + + while (c != end) { + if (*c == ':') { + colon_gap = 0; + c++; + + continue; + } + + colon_gap++; + c++; + + if (nxt_slow_path(colon_gap > 4)) { + return 0; + } + } + + return 1; +} + +#endif diff --git a/src/nxt_http_route_addr.h b/src/nxt_http_route_addr.h new file mode 100644 index 00000000..3b1e1da3 --- /dev/null +++ b/src/nxt_http_route_addr.h @@ -0,0 +1,73 @@ + +/* + * Copyright (C) Axel Duch + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_conf.h> + +#ifndef _NXT_HTTP_ROUTE_ADDR_H_INCLUDED_ +#define _NXT_HTTP_ROUTE_ADDR_H_INCLUDED_ + + +enum { + NXT_HTTP_ROUTE_ADDR_ANY = 0, + NXT_HTTP_ROUTE_ADDR_RANGE, + NXT_HTTP_ROUTE_ADDR_EXACT, + NXT_HTTP_ROUTE_ADDR_CIDR, +}; + + +enum { + NXT_ADDR_PATTERN_PORT_ERROR = NXT_OK + 1, + NXT_ADDR_PATTERN_CV_TYPE_ERROR, + NXT_ADDR_PATTERN_LENGTH_ERROR, + NXT_ADDR_PATTERN_FORMAT_ERROR, + NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR, + NXT_ADDR_PATTERN_CIDR_ERROR, + NXT_ADDR_PATTERN_NO_IPv6_ERROR, +}; + + +typedef struct { + in_addr_t start; + in_addr_t end; +} nxt_http_route_addr_range_t; + + +#if (NXT_INET6) +typedef struct { + struct in6_addr start; + struct in6_addr end; +} nxt_http_route_in6_addr_range_t; +#endif + + +typedef struct { + uint8_t match_type:2; + uint8_t negative:1; + uint8_t addr_family; + + struct { + uint16_t start; + uint16_t end; + } port; +} nxt_http_route_addr_base_t; + + +typedef struct { + nxt_http_route_addr_base_t base; + + union { + nxt_http_route_addr_range_t v4; +#if (NXT_INET6) + nxt_http_route_in6_addr_range_t v6; +#endif + } addr; +} nxt_http_route_addr_pattern_t; + + +NXT_EXPORT nxt_int_t nxt_http_route_addr_pattern_parse(nxt_mp_t *mp, + nxt_http_route_addr_pattern_t *pattern, nxt_conf_value_t *cv); + +#endif /* _NXT_HTTP_ROUTE_ADDR_H_INCLUDED_ */ diff --git a/src/nxt_java.c b/src/nxt_java.c index 08e24595..004907d6 100644 --- a/src/nxt_java.c +++ b/src/nxt_java.c @@ -354,6 +354,7 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) java_init.callbacks.close_handler = nxt_java_close_handler; java_init.request_data_size = sizeof(nxt_java_request_data_t); java_init.data = &data; + java_init.shm_limit = conf->shm_limit; ctx = nxt_unit_init(&java_init); if (nxt_slow_path(ctx == NULL)) { diff --git a/src/nxt_lib.c b/src/nxt_lib.c index db3d29c1..1634a2b8 100644 --- a/src/nxt_lib.c +++ b/src/nxt_lib.c @@ -43,6 +43,8 @@ nxt_lib_start(const char *app, char **argv, char ***envp) nxt_pid = getpid(); nxt_ppid = getppid(); + nxt_euid = geteuid(); + nxt_egid = getegid(); #if (NXT_DEBUG) diff --git a/src/nxt_main.h b/src/nxt_main.h index 0afebb96..d9e337d2 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -58,6 +58,7 @@ typedef uint16_t nxt_port_id_t; #include <nxt_thread.h> #include <nxt_process_type.h> #include <nxt_capability.h> +#include <nxt_credential.h> #include <nxt_process.h> #include <nxt_utf8.h> #include <nxt_file_name.h> diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index cfe0341f..eed37752 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -29,6 +29,10 @@ typedef struct { } nxt_conf_app_map_t; +extern nxt_port_handlers_t nxt_controller_process_port_handlers; +extern nxt_port_handlers_t nxt_router_process_port_handlers; + + static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); @@ -36,6 +40,8 @@ static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init); +static nxt_int_t nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_process_init_t *init); static nxt_int_t nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task, @@ -67,11 +73,29 @@ static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static nxt_process_init_t *nxt_process_init_create(nxt_task_t *task, + nxt_process_type_t type, const nxt_str_t *name); +static nxt_int_t nxt_process_init_name_set(nxt_process_init_t *init, + nxt_process_type_t type, const nxt_str_t *name); +static nxt_int_t nxt_process_init_creds_set(nxt_task_t *task, + nxt_process_init_t *init, nxt_str_t *user, nxt_str_t *group); + +static nxt_int_t nxt_init_isolation(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_init_t *init); +#if (NXT_HAVE_CLONE) +static nxt_int_t nxt_init_clone_flags(nxt_task_t *task, + nxt_conf_value_t *namespaces, nxt_process_init_t *init); +#endif -static nxt_int_t nxt_init_set_isolation(nxt_task_t *task, - nxt_process_init_t *init, nxt_conf_value_t *isolation); -static nxt_int_t nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, - nxt_conf_value_t *ns); +#if (NXT_HAVE_CLONE_NEWUSER) +static nxt_int_t nxt_init_isolation_creds(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_init_t *init); +static nxt_int_t nxt_init_vldt_isolation_creds(nxt_task_t *task, + nxt_process_init_t *init); +static nxt_int_t nxt_init_isolation_credential_map(nxt_task_t *task, + nxt_mp_t *mem_pool, nxt_conf_value_t *map_array, + nxt_clone_credential_map_t *map); +#endif const nxt_sig_event_t nxt_main_process_signals[] = { nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), @@ -84,6 +108,54 @@ const nxt_sig_event_t nxt_main_process_signals[] = { }; +static const nxt_port_handlers_t nxt_app_process_port_handlers = { + .new_port = nxt_port_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .remove_pid = nxt_port_remove_pid_handler, +}; + + +static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_port_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_data_handler, + .remove_pid = nxt_port_remove_pid_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + +static const nxt_port_handlers_t *nxt_process_port_handlers[NXT_PROCESS_MAX] = +{ + NULL, + &nxt_discovery_process_port_handlers, + &nxt_controller_process_port_handlers, + &nxt_router_process_port_handlers, + &nxt_app_process_port_handlers +}; + + +static const nxt_process_start_t nxt_process_starts[NXT_PROCESS_MAX] = { + NULL, + nxt_discovery_start, + nxt_controller_start, + nxt_router_start, + nxt_app_start +}; + + +static const nxt_process_restart_t nxt_process_restarts[NXT_PROCESS_MAX] = { + NULL, + NULL, + &nxt_main_create_controller_process, + &nxt_main_create_router_process, + NULL +}; + + static nxt_bool_t nxt_exiting; @@ -143,7 +215,24 @@ static nxt_conf_map_t nxt_common_app_conf[] = { nxt_string("isolation"), NXT_CONF_MAP_PTR, offsetof(nxt_common_app_conf_t, isolation), - } + }, + + { + nxt_string("limits"), + NXT_CONF_MAP_PTR, + offsetof(nxt_common_app_conf_t, limits), + }, + +}; + + +static nxt_conf_map_t nxt_common_app_limits_conf[] = { + { + nxt_string("shm"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_common_app_conf_t, shm_limit), + }, + }; @@ -309,6 +398,7 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) app_conf.name.start = start; app_conf.name.length = nxt_strlen(start); + app_conf.shm_limit = 100 * 1024 * 1024; start += app_conf.name.length + 1; @@ -355,6 +445,18 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto failed; } + if (app_conf.limits != NULL) { + ret = nxt_conf_map_object(mp, app_conf.limits, + nxt_common_app_limits_conf, + nxt_nitems(nxt_common_app_limits_conf), + &app_conf); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "failed to map app limits received from router"); + goto failed; + } + } + ret = nxt_main_start_worker_process(task, task->thread->runtime, &app_conf, msg->port_msg.stream); @@ -453,20 +555,13 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); + static const nxt_str_t name = nxt_string("controller"); + + init = nxt_process_init_create(task, NXT_PROCESS_CONTROLLER, &name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - init->start = nxt_controller_start; - init->name = "controller"; - init->user_cred = &rt->user_cred; - init->port_handlers = &nxt_controller_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_CONTROLLER; - init->stream = 0; - init->restart = &nxt_main_create_controller_process; - return nxt_main_create_controller_process(task, rt, init);; } @@ -547,21 +642,13 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); + static const nxt_str_t name = nxt_string("discovery"); + + init = nxt_process_init_create(task, NXT_PROCESS_DISCOVERY, &name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - init->start = nxt_discovery_start; - init->name = "discovery"; - init->user_cred = &rt->user_cred; - init->port_handlers = &nxt_discovery_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_DISCOVERY; - init->data = rt; - init->stream = 0; - init->restart = NULL; - return nxt_main_create_worker_process(task, rt, init); } @@ -571,72 +658,59 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); + static const nxt_str_t name = nxt_string("router"); + + init = nxt_process_init_create(task, NXT_PROCESS_ROUTER, &name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - init->start = nxt_router_start; - init->name = "router"; - init->user_cred = &rt->user_cred; - init->port_handlers = &nxt_router_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_ROUTER; - init->data = rt; - init->stream = 0; - init->restart = &nxt_main_create_worker_process; + return nxt_main_create_router_process(task, rt, init); +} + + +static nxt_int_t +nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_process_init_t *init) +{ + nxt_main_stop_worker_processes(task, rt); return nxt_main_create_worker_process(task, rt, init); } + static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream) { - char *user, *group; - u_char *title, *last, *end; - size_t size; + nxt_int_t cap_setid; nxt_int_t ret; nxt_process_init_t *init; - size = sizeof(nxt_process_init_t) - + app_conf->name.length - + sizeof("\"\" application"); - - if (rt->capabilities.setid) { - size += sizeof(nxt_user_cred_t) - + app_conf->user.length + 1 - + app_conf->group.length + 1; - } - - init = nxt_mp_zalloc(rt->mem_pool, size); + init = nxt_process_init_create(task, NXT_PROCESS_WORKER, &app_conf->name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - if (rt->capabilities.setid) { - init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t)); - user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t)); - - nxt_memcpy(user, app_conf->user.start, app_conf->user.length); - last = nxt_pointer_to(user, app_conf->user.length); - *last++ = '\0'; + cap_setid = rt->capabilities.setid; - init->user_cred->user = user; - - if (app_conf->group.start != NULL) { - group = (char *) last; - - nxt_memcpy(group, app_conf->group.start, app_conf->group.length); - last = nxt_pointer_to(group, app_conf->group.length); - *last++ = '\0'; - - } else { - group = NULL; + if (app_conf->isolation != NULL) { + ret = nxt_init_isolation(task, app_conf->isolation, init); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; } + } - ret = nxt_user_cred_get(task, init->user_cred, group); - if (ret != NXT_OK) { +#if (NXT_HAVE_CLONE_NEWUSER) + if (NXT_CLONE_USER(init->isolation.clone.flags)) { + cap_setid = 1; + } +#endif + + if (cap_setid) { + ret = nxt_process_init_creds_set(task, init, &app_conf->user, + &app_conf->group); + if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -658,40 +732,29 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, &app_conf->name); goto fail; } - - last = nxt_pointer_to(init, sizeof(nxt_process_init_t)); } - title = last; - end = title + app_conf->name.length + sizeof("\"\" application"); - - nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name); - - init->start = nxt_app_start; - init->name = (char *) title; - init->port_handlers = &nxt_app_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_WORKER; init->data = app_conf; init->stream = stream; - init->restart = NULL; - ret = nxt_init_set_isolation(task, init, app_conf->isolation); +#if (NXT_HAVE_CLONE_NEWUSER) + ret = nxt_init_vldt_isolation_creds(task, init); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } +#endif return nxt_main_create_worker_process(task, rt, init); fail: - nxt_mp_free(rt->mem_pool, init); + nxt_mp_destroy(init->mem_pool); return NXT_ERROR; } -static nxt_int_t +nxt_int_t nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init) { @@ -706,7 +769,7 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process = nxt_runtime_process_new(rt); if (nxt_slow_path(process == NULL)) { - nxt_mp_free(rt->mem_pool, init); + nxt_mp_destroy(init->mem_pool); return NXT_ERROR; } @@ -972,12 +1035,13 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_buf_t *buf; - nxt_port_t *port; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_type_t ptype; - nxt_process_init_t *init; + nxt_buf_t *buf; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_type_t ptype; + nxt_process_init_t *init; + nxt_process_restart_t restart; rt = task->thread->runtime; @@ -988,6 +1052,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) process->init = NULL; ptype = nxt_process_type(process); + restart = nxt_process_restarts[ptype]; if (process->ready) { init->stream = 0; @@ -996,6 +1061,8 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) nxt_process_close_ports(task, process); if (nxt_exiting) { + nxt_mp_destroy(init->mem_pool); + if (rt->nprocesses <= 2) { nxt_runtime_quit(task, 0); } @@ -1030,15 +1097,11 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) -1, init->stream, 0, buf); } nxt_runtime_process_loop; - if (init->restart != NULL) { - if (init->type == NXT_PROCESS_ROUTER) { - nxt_main_stop_worker_processes(task, rt); - } - - init->restart(task, rt, init); + if (restart != NULL) { + restart(task, rt, init); } else { - nxt_mp_free(rt->mem_pool, init); + nxt_mp_destroy(init->mem_pool); } } } @@ -1484,45 +1547,178 @@ nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_int_t -nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init, - nxt_conf_value_t *isolation) +nxt_init_isolation(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_init_t *init) +{ +#if (NXT_HAVE_CLONE) + nxt_int_t ret; + nxt_conf_value_t *obj; + + static nxt_str_t nsname = nxt_string("namespaces"); + + obj = nxt_conf_get_object_member(isolation, &nsname, NULL); + if (obj != NULL) { + ret = nxt_init_clone_flags(task, obj, init); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } +#endif + +#if (NXT_HAVE_CLONE_NEWUSER) + ret = nxt_init_isolation_creds(task, isolation, init); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } +#endif + + return NXT_OK; +} + + +#if (NXT_HAVE_CLONE_NEWUSER) + +static nxt_int_t +nxt_init_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_init_t *init) { nxt_int_t ret; - nxt_conf_value_t *object; + nxt_clone_t *clone; + nxt_conf_value_t *array; - static nxt_str_t nsname = nxt_string("namespaces"); static nxt_str_t uidname = nxt_string("uidmap"); static nxt_str_t gidname = nxt_string("gidmap"); - if (isolation == NULL) { + clone = &init->isolation.clone; + + array = nxt_conf_get_object_member(isolation, &uidname, NULL); + if (array != NULL) { + ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, + &clone->uidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + array = nxt_conf_get_object_member(isolation, &gidname, NULL); + if (array != NULL) { + ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, + &clone->gidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_init_vldt_isolation_creds(nxt_task_t *task, nxt_process_init_t *init) +{ + nxt_int_t ret; + nxt_clone_t *clone; + + clone = &init->isolation.clone; + + if (clone->uidmap.size == 0 && clone->gidmap.size == 0) { return NXT_OK; } - object = nxt_conf_get_object_member(isolation, &nsname, NULL); - if (object != NULL) { - ret = nxt_init_set_ns(task, init, object); - if (ret != NXT_OK) { - return ret; + if (!NXT_CLONE_USER(clone->flags)) { + if (nxt_slow_path(clone->uidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; + } + + if (nxt_slow_path(clone->gidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; } + + return NXT_OK; + } + + ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, + init->user_cred); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, + init->user_cred); +} + + +static nxt_int_t +nxt_init_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mem_pool, + nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map) +{ + nxt_int_t ret; + nxt_uint_t i; + nxt_conf_value_t *obj; + + static nxt_conf_map_t nxt_clone_map_entry_conf[] = { + { + nxt_string("container"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, container), + }, + + { + nxt_string("host"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, host), + }, + + { + nxt_string("size"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, size), + }, + }; + + map->size = nxt_conf_array_elements_count(map_array); + + if (map->size == 0) { + return NXT_OK; } - object = nxt_conf_get_object_member(isolation, &uidname, NULL); - if (object != NULL) { - init->isolation.clone.uidmap = object; + map->map = nxt_mp_alloc(mem_pool, + map->size * sizeof(nxt_clone_map_entry_t)); + if (nxt_slow_path(map->map == NULL)) { + return NXT_ERROR; } - object = nxt_conf_get_object_member(isolation, &gidname, NULL); - if (object != NULL) { - init->isolation.clone.gidmap = object; + for (i = 0; i < map->size; i++) { + obj = nxt_conf_get_array_element(map_array, i); + + ret = nxt_conf_map_object(mem_pool, obj, nxt_clone_map_entry_conf, + nxt_nitems(nxt_clone_map_entry_conf), + map->map + i); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "clone map entry map error"); + return NXT_ERROR; + } } return NXT_OK; } +#endif + +#if (NXT_HAVE_CLONE) static nxt_int_t -nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, - nxt_conf_value_t *namespaces) +nxt_init_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces, + nxt_process_init_t *init) { uint32_t index; nxt_str_t name; @@ -1588,3 +1784,122 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, return NXT_OK; } + +#endif + + +static nxt_process_init_t * +nxt_process_init_create(nxt_task_t *task, nxt_process_type_t type, + const nxt_str_t *name) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_process_init_t *init; + + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + init = nxt_mp_zalloc(mp, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + goto fail; + } + + init->mem_pool = mp; + + ret = nxt_process_init_name_set(init, type, name); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + rt = task->thread->runtime; + + init->type = type; + init->start = nxt_process_starts[type]; + init->port_handlers = nxt_process_port_handlers[type]; + init->signals = nxt_worker_process_signals; + init->user_cred = &rt->user_cred; + init->data = &rt; + + return init; + +fail: + + nxt_mp_destroy(mp); + + return NULL; +} + + +static nxt_int_t +nxt_process_init_name_set(nxt_process_init_t *init, nxt_process_type_t type, + const nxt_str_t *name) +{ + u_char *str, *end; + size_t size; + const char *fmt; + + size = name->length + 1; + + if (type == NXT_PROCESS_WORKER) { + size += nxt_length("\"\" application"); + fmt = "\"%V\" application%Z"; + + } else { + fmt = "%V%Z"; + } + + str = nxt_mp_alloc(init->mem_pool, size); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + end = str + size; + + nxt_sprintf(str, end, fmt, name); + + init->name = (char *) str; + + return NXT_OK; +} + + +static nxt_int_t +nxt_process_init_creds_set(nxt_task_t *task, nxt_process_init_t *init, + nxt_str_t *user, nxt_str_t *group) +{ + char *str; + + init->user_cred = nxt_mp_zalloc(init->mem_pool, sizeof(nxt_credential_t)); + + if (nxt_slow_path(init->user_cred == NULL)) { + return NXT_ERROR; + } + + str = nxt_mp_zalloc(init->mem_pool, user->length + 1); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(str, user->start, user->length); + str[user->length] = '\0'; + + init->user_cred->user = str; + + if (group->start != NULL) { + str = nxt_mp_zalloc(init->mem_pool, group->length + 1); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(str, group->start, group->length); + str[group->length] = '\0'; + + } else { + str = NULL; + } + + return nxt_credential_get(task, init->mem_pool, init->user_cred, str); +} diff --git a/src/nxt_main_process.h b/src/nxt_main_process.h index d932e11f..b0570a84 100644 --- a/src/nxt_main_process.h +++ b/src/nxt_main_process.h @@ -36,10 +36,7 @@ nxt_int_t nxt_router_start(nxt_task_t *task, void *data); nxt_int_t nxt_discovery_start(nxt_task_t *task, void *data); nxt_int_t nxt_app_start(nxt_task_t *task, void *data); -extern nxt_port_handlers_t nxt_controller_process_port_handlers; -extern nxt_port_handlers_t nxt_discovery_process_port_handlers; -extern nxt_port_handlers_t nxt_app_process_port_handlers; -extern nxt_port_handlers_t nxt_router_process_port_handlers; + extern const nxt_sig_event_t nxt_main_process_signals[]; extern const nxt_sig_event_t nxt_worker_process_signals[]; diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 7a5e0a3b..0f6ce686 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -352,6 +352,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_fd_blocking(task, my_port->pair[0]); php_init.log_fd = 2; + php_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&php_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/nxt_port.c b/src/nxt_port.c index 8d14a5e7..70cf33e6 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -140,7 +140,7 @@ nxt_port_reset_next_id() void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, - nxt_port_handlers_t *handlers) + const nxt_port_handlers_t *handlers) { port->pid = nxt_pid; port->handler = nxt_port_handler; diff --git a/src/nxt_port.h b/src/nxt_port.h index eeb6caa5..c6f15238 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -44,6 +44,9 @@ struct nxt_port_handlers_s { /* Various data. */ nxt_port_handler_t data; + + nxt_port_handler_t oosm; + nxt_port_handler_t shm_ack; }; @@ -82,6 +85,9 @@ typedef enum { _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), + _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), + _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), + NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) / sizeof(nxt_port_handler_t), @@ -114,6 +120,9 @@ typedef enum { NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, + + NXT_PORT_MSG_OOSM = _NXT_PORT_MSG_OOSM | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_SHM_ACK = _NXT_PORT_MSG_SHM_ACK | NXT_PORT_MSG_LAST, } nxt_port_msg_type_t; @@ -269,7 +278,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, } void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, - nxt_port_handlers_t *handlers); + const nxt_port_handlers_t *handlers); nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, uint32_t stream); void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index b7068c88..24a40406 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -112,6 +112,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) u_char *p; nxt_mp_t *mp; nxt_buf_t *b; + nxt_port_t *port; + nxt_process_t *process; nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; @@ -163,6 +165,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) c++; } + if (hdr->dst_pid == nxt_pid + && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) + { + process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid); + + if (process != NULL && !nxt_queue_is_empty(&process->ports)) { + port = nxt_process_port_first(process); + + if (port->type == NXT_PROCESS_WORKER) { + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, + -1, 0, 0, NULL); + } + } + } + release_buf: nxt_port_mmap_handler_use(mmap_handler, -1); @@ -454,6 +471,8 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, goto unlock_return; } } + + hdr->oosm = 1; } /* TODO introduce port_mmap limit and release wait. */ diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index 53dfaebf..87c3d833 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -51,6 +51,7 @@ struct nxt_port_mmap_header_s { nxt_pid_t src_pid; /* For sanity check. */ nxt_pid_t dst_pid; /* For sanity check. */ nxt_port_id_t sent_over; + nxt_atomic_t oosm; nxt_free_map_t free_map[MAX_FREE_IDX]; nxt_free_map_t free_map_padding; nxt_free_map_t free_tracking_map[MAX_FREE_IDX]; diff --git a/src/nxt_process.c b/src/nxt_process.c index b246a58c..035f747f 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -14,7 +14,6 @@ #include <signal.h> static void nxt_process_start(nxt_task_t *task, nxt_process_t *process); -static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc); static nxt_int_t nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd); @@ -24,6 +23,12 @@ nxt_pid_t nxt_pid; /* An original parent process pid. */ nxt_pid_t nxt_ppid; +/* A cached process effective uid */ +nxt_uid_t nxt_euid; + +/* A cached process effective gid */ +nxt_gid_t nxt_egid; + nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { { 1, 1, 1, 1, 1 }, { 1, 0, 0, 0, 0 }, @@ -208,8 +213,9 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) } #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) - if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) { - ret = nxt_clone_proc_map(task, pid, &init->isolation.clone); + if (NXT_CLONE_USER(init->isolation.clone.flags)) { + ret = nxt_clone_credential_map(task, pid, init->user_cred, + &init->isolation.clone); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -258,7 +264,7 @@ cleanup: static void nxt_process_start(nxt_task_t *task, nxt_process_t *process) { - nxt_int_t ret; + nxt_int_t ret, cap_setid; nxt_port_t *port, *main_port; nxt_thread_t *thread; nxt_runtime_t *rt; @@ -277,9 +283,22 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_random_init(&thread->random); - if (rt->capabilities.setid && init->user_cred != NULL) { - ret = nxt_user_cred_set(task, init->user_cred); - if (ret != NXT_OK) { + cap_setid = rt->capabilities.setid; + +#if (NXT_HAVE_CLONE_NEWUSER) + if (!cap_setid && NXT_CLONE_USER(init->isolation.clone.flags)) { + cap_setid = 1; + } +#endif + + if (cap_setid) { + ret = nxt_credential_setgids(task, init->user_cred); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + ret = nxt_credential_setuid(task, init->user_cred); + if (nxt_slow_path(ret != NXT_OK)) { goto fail; } } @@ -525,264 +544,6 @@ nxt_nanosleep(nxt_nsec_t ns) } -nxt_int_t -nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group) -{ - struct group *grp; - struct passwd *pwd; - - nxt_errno = 0; - - pwd = getpwnam(uc->user); - - if (nxt_slow_path(pwd == NULL)) { - - if (nxt_errno == 0) { - nxt_alert(task, "getpwnam(\"%s\") failed, user \"%s\" not found", - uc->user, uc->user); - } else { - nxt_alert(task, "getpwnam(\"%s\") failed %E", uc->user, nxt_errno); - } - - return NXT_ERROR; - } - - uc->uid = pwd->pw_uid; - uc->base_gid = pwd->pw_gid; - - if (group != NULL && group[0] != '\0') { - nxt_errno = 0; - - grp = getgrnam(group); - - if (nxt_slow_path(grp == NULL)) { - - if (nxt_errno == 0) { - nxt_alert(task, - "getgrnam(\"%s\") failed, group \"%s\" not found", - group, group); - } else { - nxt_alert(task, "getgrnam(\"%s\") failed %E", group, nxt_errno); - } - - return NXT_ERROR; - } - - uc->base_gid = grp->gr_gid; - } - - return nxt_user_groups_get(task, uc); -} - - -/* - * nxt_user_groups_get() stores an array of groups IDs which should be - * set by the initgroups() function for a given user. The initgroups() - * may block a just forked worker process for some time if LDAP or NDIS+ - * is used, so nxt_user_groups_get() allows to get worker user groups in - * main process. In a nutshell the initgroups() calls getgrouplist() - * followed by setgroups(). However Solaris lacks the getgrouplist(). - * Besides getgrouplist() does not allow to query the exact number of - * groups while NGROUPS_MAX can be quite large (e.g. 65536 on Linux). - * So nxt_user_groups_get() emulates getgrouplist(): at first the function - * saves the super-user groups IDs, then calls initgroups() and saves the - * specified user groups IDs, and then restores the super-user groups IDs. - * This works at least on Linux, FreeBSD, and Solaris, but does not work - * on MacOSX, getgroups(2): - * - * To provide compatibility with applications that use getgroups() in - * environments where users may be in more than {NGROUPS_MAX} groups, - * a variant of getgroups(), obtained when compiling with either the - * macros _DARWIN_UNLIMITED_GETGROUPS or _DARWIN_C_SOURCE defined, can - * be used that is not limited to {NGROUPS_MAX} groups. However, this - * variant only returns the user's default group access list and not - * the group list modified by a call to setgroups(2). - * - * For such cases initgroups() is used in worker process as fallback. - */ - -static nxt_int_t -nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) -{ - int nsaved, ngroups; - nxt_int_t ret; - nxt_gid_t *saved; - - nsaved = getgroups(0, NULL); - - if (nsaved == -1) { - nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); - return NXT_ERROR; - } - - nxt_debug(task, "getgroups(0, NULL): %d", nsaved); - - if (nsaved > NGROUPS_MAX) { - /* MacOSX case. */ - - uc->gids = NULL; - uc->ngroups = 0; - - return NXT_OK; - } - - saved = nxt_malloc(nsaved * sizeof(nxt_gid_t)); - - if (saved == NULL) { - return NXT_ERROR; - } - - ret = NXT_ERROR; - - nsaved = getgroups(nsaved, saved); - - if (nsaved == -1) { - nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno); - goto free; - } - - nxt_debug(task, "getgroups(): %d", nsaved); - - if (initgroups(uc->user, uc->base_gid) != 0) { - if (nxt_errno == NXT_EPERM) { - nxt_log(task, NXT_LOG_NOTICE, - "initgroups(%s, %d) failed %E, ignored", - uc->user, uc->base_gid, nxt_errno); - - ret = NXT_OK; - - goto free; - - } else { - nxt_alert(task, "initgroups(%s, %d) failed %E", - uc->user, uc->base_gid, nxt_errno); - goto restore; - } - } - - ngroups = getgroups(0, NULL); - - if (ngroups == -1) { - nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); - goto restore; - } - - nxt_debug(task, "getgroups(0, NULL): %d", ngroups); - - uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t)); - - if (uc->gids == NULL) { - goto restore; - } - - ngroups = getgroups(ngroups, uc->gids); - - if (ngroups == -1) { - nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno); - goto restore; - } - - uc->ngroups = ngroups; - -#if (NXT_DEBUG) - { - u_char *p, *end; - nxt_uint_t i; - u_char msg[NXT_MAX_ERROR_STR]; - - p = msg; - end = msg + NXT_MAX_ERROR_STR; - - for (i = 0; i < uc->ngroups; i++) { - p = nxt_sprintf(p, end, "%uL:", (uint64_t) uc->gids[i]); - } - - nxt_debug(task, "user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s", - uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid, - p - msg, msg); - } -#endif - - ret = NXT_OK; - -restore: - - if (setgroups(nsaved, saved) != 0) { - nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno); - ret = NXT_ERROR; - } - -free: - - nxt_free(saved); - - return ret; -} - - -nxt_int_t -nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) -{ - nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d", - uc->user, uc->uid, uc->base_gid); - - if (setgid(uc->base_gid) != 0) { - -#if (NXT_HAVE_CLONE) - if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " - "application namespace.", uc->base_gid); - return NXT_ERROR; - } -#endif - - nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); - return NXT_ERROR; - } - - if (uc->gids != NULL) { - if (setgroups(uc->ngroups, uc->gids) != 0) { - -#if (NXT_HAVE_CLONE) - if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " - "supplementary group ids not valid in the application " - "namespace.", uc->user, uc->uid); - return NXT_ERROR; - } -#endif - - nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); - return NXT_ERROR; - } - - } else { - /* MacOSX fallback. */ - if (initgroups(uc->user, uc->base_gid) != 0) { - nxt_alert(task, "initgroups(%s, %d) failed %E", - uc->user, uc->base_gid, nxt_errno); - return NXT_ERROR; - } - } - - if (setuid(uc->uid) != 0) { - -#if (NXT_HAVE_CLONE) - if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " - "valid in the application namespace.", uc->uid, uc->user); - return NXT_ERROR; - } -#endif - - nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); - return NXT_ERROR; - } - - return NXT_OK; -} - - void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) { diff --git a/src/nxt_process.h b/src/nxt_process.h index d67573f1..343fffb8 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -7,27 +7,13 @@ #ifndef _NXT_PROCESS_H_INCLUDED_ #define _NXT_PROCESS_H_INCLUDED_ -#include <nxt_conf.h> +#if (NXT_HAVE_CLONE) +#include <nxt_clone.h> +#endif typedef pid_t nxt_pid_t; -typedef uid_t nxt_uid_t; -typedef gid_t nxt_gid_t; - -typedef struct { - const char *user; - nxt_uid_t uid; - nxt_gid_t base_gid; - nxt_uint_t ngroups; - nxt_gid_t *gids; -} nxt_user_cred_t; - -typedef struct { - nxt_int_t flags; - nxt_conf_value_t *uidmap; - nxt_conf_value_t *gidmap; -} nxt_process_clone_t; typedef struct nxt_process_init_s nxt_process_init_t; typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task, void *data); @@ -35,22 +21,23 @@ typedef nxt_int_t (*nxt_process_restart_t)(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init); struct nxt_process_init_s { - nxt_process_start_t start; - const char *name; - nxt_user_cred_t *user_cred; + nxt_mp_t *mem_pool; + nxt_process_start_t start; + const char *name; + nxt_credential_t *user_cred; - nxt_port_handlers_t *port_handlers; - const nxt_sig_event_t *signals; + const nxt_port_handlers_t *port_handlers; + const nxt_sig_event_t *signals; - nxt_process_type_t type; + nxt_process_type_t type; - void *data; - uint32_t stream; - - nxt_process_restart_t restart; + void *data; + uint32_t stream; union { - nxt_process_clone_t clone; +#if (NXT_HAVE_CLONE) + nxt_clone_t clone; +#endif } isolation; }; @@ -126,10 +113,11 @@ void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid, nxt_port_id_t port_id); - void nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_init_destroy(nxt_runtime_t *rt, nxt_process_init_t *init); + #if (NXT_HAVE_SETPROCTITLE) @@ -155,12 +143,11 @@ NXT_EXPORT void nxt_process_title(nxt_task_t *task, const char *fmt, ...); #define nxt_abort() \ (void) raise(SIGABRT) -NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, - const char *group); -NXT_EXPORT nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc); NXT_EXPORT extern nxt_pid_t nxt_pid; NXT_EXPORT extern nxt_pid_t nxt_ppid; +NXT_EXPORT extern nxt_uid_t nxt_euid; +NXT_EXPORT extern nxt_gid_t nxt_egid; NXT_EXPORT extern char **nxt_process_argv; NXT_EXPORT extern char ***nxt_process_environ; diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index 5bb2fb2c..ea8b6903 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -50,6 +50,8 @@ #define PyBytes_Check PyString_Check #define PyBytes_GET_SIZE PyString_GET_SIZE #define PyBytes_AS_STRING PyString_AS_STRING +#define PyUnicode_InternInPlace PyString_InternInPlace +#define PyUnicode_AsUTF8 PyString_AS_STRING #endif typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; @@ -64,15 +66,18 @@ typedef struct { } nxt_py_error_t; static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static nxt_int_t nxt_python_init_strings(void); static void nxt_python_request_handler(nxt_unit_request_info_t *req); static void nxt_python_atexit(void); static PyObject *nxt_python_create_environ(nxt_task_t *task); static PyObject *nxt_python_get_environ(nxt_python_run_ctx_t *ctx); -static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, +static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size); -static int nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name, - const char *str, uint32_t size); +static int nxt_python_add_field(nxt_python_run_ctx_t *ctx, + nxt_unit_field_t *field); +static int nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, + PyObject *value); static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args); static int nxt_python_response_add_field(nxt_python_run_ctx_t *ctx, @@ -157,6 +162,48 @@ static PyThreadState *nxt_python_thread_state; static nxt_python_run_ctx_t *nxt_python_run_ctx; +static PyObject *nxt_py_80_str; +static PyObject *nxt_py_close_str; +static PyObject *nxt_py_content_length_str; +static PyObject *nxt_py_content_type_str; +static PyObject *nxt_py_http_str; +static PyObject *nxt_py_https_str; +static PyObject *nxt_py_path_info_str; +static PyObject *nxt_py_query_string_str; +static PyObject *nxt_py_remote_addr_str; +static PyObject *nxt_py_request_method_str; +static PyObject *nxt_py_request_uri_str; +static PyObject *nxt_py_server_addr_str; +static PyObject *nxt_py_server_name_str; +static PyObject *nxt_py_server_port_str; +static PyObject *nxt_py_server_protocol_str; +static PyObject *nxt_py_wsgi_uri_scheme_str; + +typedef struct { + nxt_str_t string; + PyObject **object_p; +} nxt_python_string_t; + +static nxt_python_string_t nxt_python_strings[] = { + { nxt_string("80"), &nxt_py_80_str }, + { nxt_string("close"), &nxt_py_close_str }, + { nxt_string("CONTENT_LENGTH"), &nxt_py_content_length_str }, + { nxt_string("CONTENT_TYPE"), &nxt_py_content_type_str }, + { nxt_string("http"), &nxt_py_http_str }, + { nxt_string("https"), &nxt_py_https_str }, + { nxt_string("PATH_INFO"), &nxt_py_path_info_str }, + { nxt_string("QUERY_STRING"), &nxt_py_query_string_str }, + { nxt_string("REMOTE_ADDR"), &nxt_py_remote_addr_str }, + { nxt_string("REQUEST_METHOD"), &nxt_py_request_method_str }, + { nxt_string("REQUEST_URI"), &nxt_py_request_uri_str }, + { nxt_string("SERVER_ADDR"), &nxt_py_server_addr_str }, + { nxt_string("SERVER_NAME"), &nxt_py_server_name_str }, + { nxt_string("SERVER_PORT"), &nxt_py_server_port_str }, + { nxt_string("SERVER_PROTOCOL"), &nxt_py_server_protocol_str }, + { nxt_string("wsgi.url_scheme"), &nxt_py_wsgi_uri_scheme_str }, +}; + + static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { @@ -239,6 +286,12 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) Py_InitializeEx(0); module = NULL; + obj = NULL; + + if (nxt_slow_path(nxt_python_init_strings() != NXT_OK)) { + nxt_alert(task, "Python failed to init string objects"); + goto fail; + } obj = PySys_GetObject((char *) "stderr"); if (nxt_slow_path(obj == NULL)) { @@ -351,6 +404,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_unit_default_init(task, &python_init); python_init.callbacks.request_handler = nxt_python_request_handler; + python_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&python_init); if (nxt_slow_path(unit_ctx == NULL)) { @@ -382,6 +436,31 @@ fail: } +static nxt_int_t +nxt_python_init_strings(void) +{ + PyObject *obj; + nxt_uint_t i; + nxt_python_string_t *pstr; + + for (i = 0; i < nxt_nitems(nxt_python_strings); i++) { + pstr = &nxt_python_strings[i]; + + obj = PyString_FromStringAndSize((char *) pstr->string.start, + pstr->string.length); + if (nxt_slow_path(obj == NULL)) { + return NXT_ERROR; + } + + PyUnicode_InternInPlace(&obj); + + *pstr->object_p = obj; + } + + return NXT_OK; +} + + static void nxt_python_request_handler(nxt_unit_request_info_t *req) { @@ -478,7 +557,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) rc = NXT_UNIT_ERROR; } - close = PyObject_GetAttrString(response, "close"); + close = PyObject_GetAttr(response, nxt_py_close_str); if (close != NULL) { result = PyObject_CallFunction(close, NULL); @@ -512,6 +591,12 @@ done: static void nxt_python_atexit(void) { + nxt_uint_t i; + + for (i = 0; i < nxt_nitems(nxt_python_strings); i++) { + Py_XDECREF(*nxt_python_strings[i].object_p); + } + Py_XDECREF(nxt_py_stderr_flush); Py_XDECREF(nxt_py_application); Py_XDECREF(nxt_py_start_resp_obj); @@ -655,7 +740,6 @@ static PyObject * nxt_python_get_environ(nxt_python_run_ctx_t *ctx) { int rc; - char *name; uint32_t i; PyObject *environ; nxt_unit_field_t *f; @@ -681,47 +765,52 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx) } \ } while(0) - RC(nxt_python_add_sptr(ctx, "REQUEST_METHOD", &r->method, + RC(nxt_python_add_sptr(ctx, nxt_py_request_method_str, &r->method, r->method_length)); - RC(nxt_python_add_sptr(ctx, "REQUEST_URI", &r->target, r->target_length)); - RC(nxt_python_add_sptr(ctx, "QUERY_STRING", &r->query, r->query_length)); - RC(nxt_python_add_sptr(ctx, "PATH_INFO", &r->path, r->path_length)); - - RC(nxt_python_add_sptr(ctx, "REMOTE_ADDR", &r->remote, r->remote_length)); - RC(nxt_python_add_sptr(ctx, "SERVER_ADDR", &r->local, r->local_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_request_uri_str, &r->target, + r->target_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_query_string_str, &r->query, + r->query_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_path_info_str, &r->path, + r->path_length)); + + RC(nxt_python_add_sptr(ctx, nxt_py_remote_addr_str, &r->remote, + r->remote_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_server_addr_str, &r->local, + r->local_length)); if (r->tls) { - RC(nxt_python_add_str(ctx, "wsgi.url_scheme", "https", 5)); - + RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + nxt_py_https_str)); } else { - RC(nxt_python_add_str(ctx, "wsgi.url_scheme", "http", 4)); + RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + nxt_py_http_str)); } - RC(nxt_python_add_sptr(ctx, "SERVER_PROTOCOL", &r->version, + RC(nxt_python_add_sptr(ctx, nxt_py_server_protocol_str, &r->version, r->version_length)); - RC(nxt_python_add_sptr(ctx, "SERVER_NAME", &r->server_name, + RC(nxt_python_add_sptr(ctx, nxt_py_server_name_str, &r->server_name, r->server_name_length)); - RC(nxt_python_add_str(ctx, "SERVER_PORT", "80", 2)); + RC(nxt_python_add_obj(ctx, nxt_py_server_port_str, nxt_py_80_str)); for (i = 0; i < r->fields_count; i++) { f = r->fields + i; - name = nxt_unit_sptr_get(&f->name); - RC(nxt_python_add_sptr(ctx, name, &f->value, f->value_length)); + RC(nxt_python_add_field(ctx, f)); } if (r->content_length_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_length_field; - RC(nxt_python_add_sptr(ctx, "CONTENT_LENGTH", &f->value, + RC(nxt_python_add_sptr(ctx, nxt_py_content_length_str, &f->value, f->value_length)); } if (r->content_type_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_type_field; - RC(nxt_python_add_sptr(ctx, "CONTENT_TYPE", &f->value, + RC(nxt_python_add_sptr(ctx, nxt_py_content_type_str, &f->value, f->value_length)); } @@ -738,7 +827,7 @@ fail: static int -nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, +nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size) { char *src; @@ -756,10 +845,10 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, return NXT_UNIT_ERROR; } - if (nxt_slow_path(PyDict_SetItemString(ctx->environ, name, value) != 0)) { + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { nxt_unit_req_error(ctx->req, "Python failed to set the \"%s\" environ value", - name); + PyUnicode_AsUTF8(name)); Py_DECREF(value); return NXT_UNIT_ERROR; @@ -772,37 +861,67 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, static int -nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name, - const char *str, uint32_t size) +nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field) { - PyObject *value; + char *src; + PyObject *name, *value; - if (nxt_slow_path(str == NULL)) { - return NXT_UNIT_OK; + src = nxt_unit_sptr_get(&field->name); + + name = PyString_FromStringAndSize(src, field->name_length); + if (nxt_slow_path(name == NULL)) { + nxt_unit_req_error(ctx->req, + "Python failed to create name string \"%.*s\"", + (int) field->name_length, src); + nxt_python_print_exception(); + + return NXT_UNIT_ERROR; } - value = PyString_FromStringAndSize(str, size); + src = nxt_unit_sptr_get(&field->value); + + value = PyString_FromStringAndSize(src, field->value_length); if (nxt_slow_path(value == NULL)) { nxt_unit_req_error(ctx->req, "Python failed to create value string \"%.*s\"", - (int) size, str); + (int) field->value_length, src); nxt_python_print_exception(); - return NXT_UNIT_ERROR; + goto fail; } - if (nxt_slow_path(PyDict_SetItemString(ctx->environ, name, value) != 0)) { + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { nxt_unit_req_error(ctx->req, "Python failed to set the \"%s\" environ value", - name); + PyUnicode_AsUTF8(name)); + goto fail; + } - Py_DECREF(value); + Py_DECREF(name); + Py_DECREF(value); + + return NXT_UNIT_OK; + +fail: + + Py_DECREF(name); + Py_XDECREF(value); + + return NXT_UNIT_ERROR; +} + + +static int +nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value) +{ + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { + nxt_unit_req_error(ctx->req, + "Python failed to set the \"%s\" environ value", + PyUnicode_AsUTF8(name)); return NXT_UNIT_ERROR; } - Py_DECREF(value); - return NXT_UNIT_OK; } diff --git a/src/nxt_router.c b/src/nxt_router.c index b9f5d921..6a1f3792 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -95,16 +95,16 @@ nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) } nxt_inline void -nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link) +nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) { #if (NXT_DEBUG) int c; - c = nxt_atomic_fetch_add(&req_app_link->use_count, -1); + c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - nxt_assert(c > 1); + nxt_assert((c + i) > 0); #else - (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1); + (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); #endif } @@ -248,6 +248,7 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r); static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); +static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -276,6 +277,7 @@ nxt_port_handlers_t nxt_router_process_port_handlers = { .access_log = nxt_router_access_log_reopen_handler, .rpc_ready = nxt_port_rpc_handler, .rpc_error = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, }; @@ -600,8 +602,6 @@ nxt_request_app_link_update_peer(nxt_task_t *task, nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, req_app_link->app_port->pid); } - - nxt_request_app_link_use(task, req_app_link, -1); } @@ -2750,6 +2750,7 @@ static nxt_port_handlers_t nxt_router_app_port_handlers = { .rpc_error = nxt_port_rpc_handler, .mmap = nxt_port_mmap_handler, .data = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, }; @@ -3732,8 +3733,6 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); if (cancelled) { - nxt_request_app_link_inc_use(req_app_link); - res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); if (res == NXT_OK) { @@ -3751,6 +3750,8 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_app_prepare_request(task, req_app_link); } + nxt_request_app_link_use(task, req_app_link, -1); + msg->port_msg.last = 0; return; @@ -4015,6 +4016,8 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) #endif nxt_router_app_prepare_request(task, req_app_link); + + nxt_request_app_link_use(task, req_app_link, -1); } @@ -4148,8 +4151,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, re_ra->stream); if (cancelled) { - nxt_request_app_link_inc_use(re_ra); - state.req_app_link = re_ra; state.app = app; @@ -4217,19 +4218,38 @@ re_ra_cancelled: if (re_ra != NULL) { if (nxt_router_port_post_select(task, &state) == NXT_OK) { + /* + * There should be call nxt_request_app_link_inc_use(re_ra), + * but we need to decrement use then. So, let's skip both. + */ + nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, re_ra); + + } else { + /* + * This call should be unconditional, but we want to spare + * couple of CPU ticks to postpone the head death of the universe. + */ + + nxt_request_app_link_use(task, re_ra, -1); } } if (req_app_link != NULL) { - nxt_request_app_link_use(task, req_app_link, -1); + /* + * Here we do the same trick as described above, + * but without conditions. + * Skip required nxt_request_app_link_inc_use(req_app_link). + */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, req_app_link); + /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */ + goto adjust_use; } @@ -4477,6 +4497,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { + int ra_use_delta; nxt_app_t *app; nxt_bool_t can_start_process; nxt_request_app_link_t *req_app_link; @@ -4485,11 +4506,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) app = state->app; state->failed_port_use_delta = 0; - - if (nxt_queue_chk_remove(&req_app_link->link_app_requests)) - { - nxt_request_app_link_dec_use(req_app_link); - } + ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) { @@ -4498,7 +4515,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_queue_remove(&req_app_link->link_app_pending); req_app_link->link_app_pending.next = NULL; - nxt_request_app_link_dec_use(req_app_link); + ra_use_delta--; } state->failed_port = req_app_link->app_port; @@ -4538,7 +4555,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) &req_app_link->link_app_requests); } - nxt_request_app_link_inc_use(req_app_link); + ra_use_delta++; nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", req_app_link->stream); @@ -4569,6 +4586,8 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) } } + nxt_request_app_link_chk_use(req_app_link, ra_use_delta); + fail: state->shared_ra = req_app_link; @@ -4596,7 +4615,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_request_app_link_error(state->req_app_link, 500, "Failed to allocate shared req<->app link"); - nxt_request_app_link_use(task, state->req_app_link, -1); return NXT_ERROR; } @@ -4625,7 +4643,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) if (nxt_slow_path(res != NXT_OK)) { nxt_request_app_link_error(req_app_link, 500, "Failed to start app process"); - nxt_request_app_link_use(task, req_app_link, -1); return NXT_ERROR; } @@ -4686,19 +4703,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_request_app_link_init(task, req_app_link, req_rpc_data); res = nxt_router_app_port(task, app, req_app_link); + req_app_link = req_rpc_data->req_app_link; - if (res != NXT_OK) { - return; - } + if (res == NXT_OK) { + port = req_app_link->app_port; - req_app_link = req_rpc_data->req_app_link; - port = req_app_link->app_port; + nxt_assert(port != NULL); - nxt_assert(port != NULL); + nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); + nxt_router_app_prepare_request(task, req_app_link); + } - nxt_router_app_prepare_request(task, req_app_link); + nxt_request_app_link_use(task, req_app_link, -1); } @@ -5172,8 +5189,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) pending_ra->stream); if (cancelled) { - nxt_request_app_link_inc_use(pending_ra); - state.req_app_link = pending_ra; state.app = app; @@ -5186,10 +5201,12 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) nxt_thread_mutex_unlock(&app->mutex); - if (pending_ra != NULL - && nxt_router_port_post_select(task, &state) == NXT_OK) - { - nxt_router_app_prepare_request(task, pending_ra); + if (pending_ra != NULL) { + if (nxt_router_port_post_select(task, &state) == NXT_OK) { + nxt_router_app_prepare_request(task, pending_ra); + } + + nxt_request_app_link_use(task, pending_ra, -1); } nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); @@ -5227,3 +5244,56 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) nxt_mp_release(r->mem_pool); } + + +static void +nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + size_t mi; + uint32_t i; + nxt_bool_t ack; + nxt_process_t *process; + nxt_free_map_t *m; + nxt_port_mmap_header_t *hdr; + + nxt_debug(task, "oosm in %PI", msg->port_msg.pid); + + process = nxt_runtime_process_find(task->thread->runtime, + msg->port_msg.pid); + if (nxt_slow_path(process == NULL)) { + return; + } + + ack = 0; + + /* + * To mitigate possible racing condition (when OOSM message received + * after some of the memory was already freed), need to try to find + * first free segment in shared memory and send ACK if found. + */ + + nxt_thread_mutex_lock(&process->incoming.mutex); + + for (i = 0; i < process->incoming.size; i++) { + hdr = process->incoming.elts[i].mmap_handler->hdr; + m = hdr->free_map; + + for (mi = 0; mi < MAX_FREE_IDX; mi++) { + if (m[mi] != 0) { + ack = 1; + + nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", + i, mi, m[mi]); + + break; + } + } + } + + nxt_thread_mutex_unlock(&process->incoming.mutex); + + if (ack) { + (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK, + -1, 0, 0, NULL); + } +} diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 096aabc4..80b25c1b 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -705,7 +705,10 @@ nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) } if (rt->capabilities.setid) { - if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { + ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred, + rt->group); + + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -1323,7 +1326,7 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_thread_mutex_destroy(&process->cp_mutex); if (process->init != NULL) { - nxt_mp_free(rt->mem_pool, process->init); + nxt_mp_destroy(process->init->mem_pool); } nxt_mp_free(rt->mem_pool, process); diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index d5b340b6..f8d19ec6 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -58,7 +58,7 @@ struct nxt_runtime_s { const char *engine; uint32_t engine_connections; uint32_t auxiliary_threads; - nxt_user_cred_t user_cred; + nxt_credential_t user_cred; nxt_capabilities_t capabilities; const char *group; const char *pid; diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c index 57dfbfa6..af696a6b 100644 --- a/src/nxt_sockaddr.c +++ b/src/nxt_sockaddr.c @@ -1140,6 +1140,10 @@ nxt_inet_addr(u_char *buf, size_t length) in_addr_t addr; nxt_uint_t digit, octet, dots; + if (nxt_slow_path(*(buf + length - 1) == '.')) { + return INADDR_NONE; + } + addr = 0; octet = 0; dots = 0; diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 0cf32916..95874db3 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -19,12 +19,21 @@ #include <linux/memfd.h> #endif +#define NXT_UNIT_MAX_PLAIN_SIZE 1024 +#define NXT_UNIT_LOCAL_BUF_SIZE \ + (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) + +#define NXT_UNIT_MAX_PLAIN_SIZE 1024 +#define NXT_UNIT_LOCAL_BUF_SIZE \ + (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) + typedef struct nxt_unit_impl_s nxt_unit_impl_t; typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; typedef struct nxt_unit_process_s nxt_unit_process_t; typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; +typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; @@ -37,9 +46,10 @@ nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); -nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); +nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); + nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, uint32_t stream); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, @@ -48,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); @@ -63,11 +74,19 @@ static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_mmap_buf_t *mmap_buf, int last); static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); +static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); +static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); +static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( + nxt_unit_ctx_impl_t *ctx_impl); +static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf); static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, - nxt_chunk_id_t *c, int n); + nxt_chunk_id_t *c, int *n, int min_n); +static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); +static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); @@ -75,7 +94,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, - nxt_unit_mmap_buf_t *mmap_buf); + uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); @@ -88,14 +107,18 @@ static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); -static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, - uint32_t size); +static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, + nxt_unit_process_t *process, + nxt_port_mmap_header_t *hdr, void *start, uint32_t size); +static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); +static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd); @@ -135,10 +158,12 @@ struct nxt_unit_mmap_buf_s { nxt_unit_mmap_buf_t **prev; nxt_port_mmap_header_t *hdr; -// nxt_queue_link_t link; nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_process_t *process; + char *free_ptr; + char *plain_ptr; }; @@ -196,8 +221,14 @@ struct nxt_unit_websocket_frame_impl_s { nxt_queue_link_t link; nxt_unit_ctx_impl_t *ctx_impl; +}; + - void *retain_buf; +struct nxt_unit_read_buf_s { + nxt_unit_read_buf_t *next; + ssize_t size; + char buf[16384]; + char oob[256]; }; @@ -225,7 +256,12 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; + nxt_unit_read_buf_t *pending_read_head; + nxt_unit_read_buf_t **pending_read_tail; + nxt_unit_read_buf_t *free_read_buf; + nxt_unit_mmap_buf_t ctx_buf[2]; + nxt_unit_read_buf_t ctx_read_buf; nxt_unit_request_info_impl_t req; }; @@ -236,6 +272,7 @@ struct nxt_unit_impl_s { nxt_unit_callbacks_t callbacks; uint32_t request_data_size; + uint32_t shm_mmap_limit; pthread_mutex_t mutex; @@ -271,6 +308,7 @@ struct nxt_unit_mmaps_s { pthread_mutex_t mutex; uint32_t size; uint32_t cap; + nxt_atomic_t allocated_chunks; nxt_unit_mmap_t *elts; }; @@ -302,7 +340,7 @@ nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { int rc; - uint32_t ready_stream; + uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t ready_port, read_port; @@ -325,12 +363,20 @@ nxt_unit_init(nxt_unit_init_t *init) ready_port.id.id); nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); + } else { rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, - &ready_stream); + &ready_stream, &shm_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } + + lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) + / PORT_MMAP_DATA_SIZE; + } + + if (nxt_slow_path(lib->shm_mmap_limit < 1)) { + lib->shm_mmap_limit = 1; } lib->pid = read_port.id.pid; @@ -395,6 +441,8 @@ nxt_unit_create(nxt_unit_init_t *init) lib->callbacks = init->callbacks; lib->request_data_size = init->request_data_size; + lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) + / PORT_MMAP_DATA_SIZE; lib->processes.slot = NULL; lib->ports.slot = NULL; @@ -479,6 +527,11 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + ctx_impl->pending_read_head = NULL; + ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; + ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; + ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; @@ -517,7 +570,7 @@ nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void -nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) +nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) { nxt_unit_mmap_buf_t **prev; @@ -535,7 +588,7 @@ nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream) + int *log_fd, uint32_t *stream, uint32_t *shm_limit) { int rc; int ready_fd, read_fd; @@ -570,14 +623,14 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" - "%d", + "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &read_pid, &read_id, &read_fd, - log_fd); + log_fd, shm_limit); - if (nxt_slow_path(rc != 8)) { - nxt_unit_alert(NULL, "failed to scan variables"); + if (nxt_slow_path(rc != 9)) { + nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; } @@ -756,6 +809,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, rc = NXT_UNIT_OK; break; + case _NXT_PORT_MSG_SHM_ACK: + rc = nxt_unit_process_shm_ack(ctx); + break; + default: nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); @@ -961,7 +1018,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) ws_impl->ws.req = req; ws_impl->buf = NULL; - ws_impl->retain_buf = NULL; if (recv_msg->mmap) { for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { @@ -986,7 +1042,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - b->hdr = NULL; b->req = req; b->buf.start = recv_msg->start; b->buf.free = b->buf.start; @@ -1038,6 +1093,23 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } +static int +nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_callbacks_t *cb; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + cb = &lib->callbacks; + + if (cb->shm_ack_handler != NULL) { + cb->shm_ack_handler(ctx); + } + + return NXT_UNIT_OK; +} + + static nxt_unit_request_info_impl_t * nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) { @@ -1193,12 +1265,6 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) ws->req = NULL; - if (ws_impl->retain_buf != NULL) { - free(ws_impl->retain_buf); - - ws_impl->retain_buf = NULL; - } - pthread_mutex_lock(&ws_impl->ctx_impl->mutex); nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); @@ -1649,7 +1715,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) req->response_buf = NULL; req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); } return rc; @@ -1697,7 +1763,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, size, mmap_buf); + &req->response_port, size, size, mmap_buf, + NULL); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1755,13 +1822,16 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) } else { mmap_buf = ctx_impl->free_buf; - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); pthread_mutex_unlock(&ctx_impl->mutex); } mmap_buf->ctx_impl = ctx_impl; + mmap_buf->hdr = NULL; + mmap_buf->free_ptr = NULL; + return mmap_buf; } @@ -1769,7 +1839,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) { - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); @@ -1896,7 +1966,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } } - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); return NXT_UNIT_OK; } @@ -1917,7 +1987,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); nxt_unit_request_info_release(req); @@ -1936,7 +2006,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_port_mmap_msg_t mmap_msg; } m; - u_char *end, *last_used, *first_free; + int rc; + u_char *last_used, *first_free; ssize_t res; nxt_chunk_id_t first_free_chunk; nxt_unit_buf_t *buf; @@ -1960,38 +2031,85 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.msg.mf = 0; m.msg.tracking = 0; - if (hdr != NULL) { + rc = NXT_UNIT_ERROR; + + if (m.msg.mmap) { m.mmap_msg.mmap_id = hdr->id; m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); - } - nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", - stream, - (int) m.mmap_msg.mmap_id, - (int) m.mmap_msg.chunk_id, - (int) m.mmap_msg.size); + nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", + stream, + (int) m.mmap_msg.mmap_id, + (int) m.mmap_msg.chunk_id, + (int) m.mmap_msg.size); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, - m.msg.mmap ? sizeof(m) : sizeof(m.msg), - NULL, 0); - if (nxt_slow_path(res != sizeof(m))) { - return NXT_UNIT_ERROR; - } + res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + goto free_buf; + } - if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { last_used = (u_char *) buf->free - 1; - first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; - first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); - end = (u_char *) buf->end; - nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); + if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { + first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); + + buf->start = (char *) first_free; + buf->free = buf->start; + + if (buf->end < buf->start) { + buf->end = buf->start; + } + + } else { + buf->start = NULL; + buf->free = NULL; + buf->end = NULL; + + mmap_buf->hdr = NULL; + } + + nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, + (int) m.mmap_msg.chunk_id - (int) first_free_chunk); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + mmap_buf->process->pid, + mmap_buf->process->outgoing.allocated_chunks); + + } else { + if (nxt_slow_path(mmap_buf->plain_ptr == NULL + || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) + { + nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" + ": no space reserved for message header", stream); + + goto free_buf; + } + + memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); + + nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", + stream, + (int) (sizeof(m.msg) + m.mmap_msg.size)); - buf->end = (char *) first_free; + res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, + buf->start - sizeof(m.msg), + m.mmap_msg.size + sizeof(m.msg), + NULL, 0); + if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { + goto free_buf; + } } - return NXT_UNIT_OK; + rc = NXT_UNIT_OK; + +free_buf: + + nxt_unit_free_outgoing_buf(mmap_buf); + + return rc; } @@ -2005,12 +2123,83 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf) static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) { - if (nxt_fast_path(mmap_buf->hdr != NULL)) { - nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, + nxt_unit_free_outgoing_buf(mmap_buf); + + nxt_unit_mmap_buf_release(mmap_buf); +} + + +static void +nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) +{ + if (mmap_buf->hdr != NULL) { + nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, + mmap_buf->process, + mmap_buf->hdr, mmap_buf->buf.start, mmap_buf->buf.end - mmap_buf->buf.start); + + mmap_buf->hdr = NULL; + + return; } - nxt_unit_mmap_buf_release(mmap_buf); + if (mmap_buf->free_ptr != NULL) { + free(mmap_buf->free_ptr); + + mmap_buf->free_ptr = NULL; + } +} + + +static nxt_unit_read_buf_t * +nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + return nxt_unit_read_buf_get_impl(ctx_impl); +} + + +static nxt_unit_read_buf_t * +nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_unit_read_buf_t *rbuf; + + if (ctx_impl->free_read_buf != NULL) { + rbuf = ctx_impl->free_read_buf; + ctx_impl->free_read_buf = rbuf->next; + + pthread_mutex_unlock(&ctx_impl->mutex); + + return rbuf; + } + + pthread_mutex_unlock(&ctx_impl->mutex); + + rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + + return rbuf; +} + + +static void +nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + rbuf->next = ctx_impl->free_read_buf; + ctx_impl->free_read_buf = rbuf; + + pthread_mutex_unlock(&ctx_impl->mutex); } @@ -2047,61 +2236,93 @@ int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, size_t size) { + ssize_t res; + + res = nxt_unit_response_write_nb(req, start, size, size); + + return res < 0 ? -res : NXT_UNIT_OK; +} + + +ssize_t +nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, + size_t size, size_t min_size) +{ int rc; - uint32_t part_size; + ssize_t sent; + uint32_t part_size, min_part_size, buf_size; const char *part_start; nxt_unit_mmap_buf_t mmap_buf; nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; + sent = 0; + + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_warn(req, "write: response not initialized yet"); + + return -NXT_UNIT_ERROR; + } /* Check if response is not send yet. */ - if (nxt_slow_path(req->response_buf)) { + if (nxt_slow_path(req->response_buf != NULL)) { part_size = req->response_buf->end - req->response_buf->free; part_size = nxt_min(size, part_size); rc = nxt_unit_response_add_content(req, part_start, part_size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } rc = nxt_unit_response_send(req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } size -= part_size; part_start += part_size; + sent += part_size; + + min_size -= nxt_min(min_size, part_size); } while (size > 0) { part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); + min_part_size = nxt_min(min_size, part_size); + min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, &req->response_port, part_size, - &mmap_buf); + min_part_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } + buf_size = mmap_buf.buf.end - mmap_buf.buf.free; + if (nxt_slow_path(buf_size == 0)) { + return sent; + } + part_size = nxt_min(buf_size, part_size); + mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, - mmap_buf.buf.end - mmap_buf.buf.start); - - return rc; + return -rc; } size -= part_size; part_start += part_size; + sent += part_size; + + min_size -= nxt_min(min_size, part_size); } - return NXT_UNIT_OK; + return sent; } @@ -2109,9 +2330,15 @@ int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_read_info_t *read_info) { - int rc; - ssize_t n; - nxt_unit_buf_t *buf; + int rc; + ssize_t n; + uint32_t buf_size; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); /* Check if response is not send yet. */ if (nxt_slow_path(req->response_buf)) { @@ -2159,20 +2386,24 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", read_info->buf_size); - buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, "Failed to allocate buf for content"); + buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + buf_size, buf_size, + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + buf = &mmap_buf.buf; + while (!read_info->eof && buf->end > buf->free) { n = read_info->read(read_info, buf->free, buf->end - buf->free); if (nxt_slow_path(n < 0)) { nxt_unit_req_error(req, "Read error"); - nxt_unit_buf_free(buf); + nxt_unit_free_outgoing_buf(&mmap_buf); return NXT_UNIT_ERROR; } @@ -2180,7 +2411,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf->free += n; } - rc = nxt_unit_buf_send(buf); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "Failed to send content"); @@ -2325,12 +2556,17 @@ int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, uint8_t last, const struct iovec *iov, int iovcnt) { - int i, rc; - size_t l, copy; - uint32_t payload_len, buf_size; - const uint8_t *b; - nxt_unit_buf_t *buf; - nxt_websocket_header_t *wh; + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size, alloc_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_websocket_header_t *wh; + nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); payload_len = 0; @@ -2339,18 +2575,23 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, } buf_size = 10 + payload_len; + alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, "Failed to allocate buf for content"); - - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + alloc_size, alloc_size, + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + buf = &mmap_buf.buf; + buf->start[0] = 0; buf->start[1] = 0; + buf_size -= buf->end - buf->start; + wh = (void *) buf->free; buf->free = nxt_websocket_frame_init(wh, payload_len); @@ -2370,32 +2611,33 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, l -= copy; if (l > 0) { - buf_size -= buf->end - buf->start; - - rc = nxt_unit_buf_send(buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(req, "Failed to send content"); + if (nxt_fast_path(buf->free > buf->start)) { + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, + &mmap_buf, 0); - return NXT_UNIT_ERROR; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; + } } - buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, - "Failed to allocate buf for content"); + alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + alloc_size, alloc_size, + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + + buf_size -= buf->end - buf->start; } } } if (buf->free > buf->start) { - rc = nxt_unit_buf_send(buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(req, "Failed to send content"); - } + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, + &mmap_buf, 0); } return rc; @@ -2437,7 +2679,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); - if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { + if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { return NXT_UNIT_OK; } @@ -2454,7 +2696,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) ws_impl->buf->buf.free = b; ws_impl->buf->buf.end = b + size; - ws_impl->retain_buf = b; + ws_impl->buf->free_ptr = b; return NXT_UNIT_OK; } @@ -2469,15 +2711,23 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) static nxt_port_mmap_header_t * nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) + nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) { int res, nchunks, i; + uint32_t outgoing_size; nxt_unit_mmap_t *mm, *mm_end; + nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + pthread_mutex_lock(&process->outgoing.mutex); - mm_end = process->outgoing.elts + process->outgoing.size; +retry: + + outgoing_size = process->outgoing.size; + + mm_end = process->outgoing.elts + outgoing_size; for (mm = process->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; @@ -2491,11 +2741,17 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { nchunks = 1; - while (nchunks < n) { + while (nchunks < *n) { res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, *c + nchunks); if (res == 0) { + if (nchunks >= min_n) { + *n = nchunks; + + goto unlock; + } + for (i = 0; i < nchunks; i++) { nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); } @@ -2508,23 +2764,155 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nchunks++; } - if (nchunks == n) { + if (nchunks >= min_n) { + *n = nchunks; + goto unlock; } } + + hdr->oosm = 1; + } + + if (outgoing_size >= lib->shm_mmap_limit) { + /* Cannot allocate more shared memory. */ + pthread_mutex_unlock(&process->outgoing.mutex); + + if (min_n == 0) { + *n = 0; + } + + if (nxt_slow_path(process->outgoing.allocated_chunks + min_n + >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) + { + /* Memory allocated by application, but not send to router. */ + return NULL; + } + + /* Notify router about OOSM condition. */ + + res = nxt_unit_send_oosm(ctx, port_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NULL; + } + + /* Return if caller can handle OOSM condition. Non-blocking mode. */ + + if (min_n == 0) { + return NULL; + } + + nxt_unit_debug(ctx, "oosm: waiting for ACK"); + + res = nxt_unit_wait_shm_ack(ctx); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NULL; + } + + nxt_unit_debug(ctx, "oosm: retry"); + + pthread_mutex_lock(&process->outgoing.mutex); + + goto retry; } *c = 0; - hdr = nxt_unit_new_mmap(ctx, process, port_id, n); + hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); unlock: + nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + process->pid, + process->outgoing.allocated_chunks); + pthread_mutex_unlock(&process->outgoing.mutex); return hdr; } +static int +nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + msg.stream = 0; + msg.pid = lib->pid; + msg.reply_port = 0; + msg.type = _NXT_PORT_MSG_OOSM; + msg.last = 0; + msg.mmap = 0; + msg.nf = 0; + msg.mf = 0; + msg.tracking = 0; + + res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)", + (int) port_id->pid, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) +{ + nxt_port_msg_t *port_msg; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + while (1) { + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + nxt_unit_read_buf_release(ctx, rbuf); + + return NXT_UNIT_ERROR; + } + + port_msg = (nxt_port_msg_t *) rbuf->buf; + + if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { + nxt_unit_read_buf_release(ctx, rbuf); + + break; + } + + pthread_mutex_lock(&ctx_impl->mutex); + + *ctx_impl->pending_read_tail = rbuf; + ctx_impl->pending_read_tail = &rbuf->next; + rbuf->next = NULL; + + pthread_mutex_unlock(&ctx_impl->mutex); + + if (port_msg->type == _NXT_PORT_MSG_QUIT) { + nxt_unit_debug(ctx, "oosm: quit received"); + + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { @@ -2759,17 +3147,55 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, uint32_t size, - nxt_unit_mmap_buf_t *mmap_buf) + nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, + nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) { - uint32_t nchunks; + int nchunks, min_nchunks; nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; + if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { + if (local_buf != NULL) { + mmap_buf->free_ptr = NULL; + mmap_buf->plain_ptr = local_buf; + + } else { + mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t)); + if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { + return NXT_UNIT_ERROR; + } + + mmap_buf->plain_ptr = mmap_buf->free_ptr; + } + + mmap_buf->hdr = NULL; + mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); + mmap_buf->buf.free = mmap_buf->buf.start; + mmap_buf->buf.end = mmap_buf->buf.start + size; + mmap_buf->port_id = *port_id; + mmap_buf->process = process; + + nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", + mmap_buf->buf.start, (int) size); + + return NXT_UNIT_OK; + } + nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; + min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; - hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks); + hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); if (nxt_slow_path(hdr == NULL)) { + if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { + mmap_buf->hdr = NULL; + mmap_buf->buf.start = NULL; + mmap_buf->buf.free = NULL; + mmap_buf->buf.end = NULL; + mmap_buf->free_ptr = NULL; + + return NXT_UNIT_OK; + } + return NXT_UNIT_ERROR; } @@ -2778,6 +3204,8 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; mmap_buf->port_id = *port_id; + mmap_buf->process = process; + mmap_buf->free_ptr = NULL; nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", (int) hdr->id, (int) c, @@ -2880,6 +3308,7 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) mmaps->size = 0; mmaps->cap = 0; mmaps->elts = NULL; + mmaps->allocated_chunks = 0; } @@ -3020,6 +3449,22 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) incoming_tail = &recv_msg->incoming_buf; + for (; mmap_msg < end; mmap_msg++) { + b = nxt_unit_mmap_buf_get(ctx); + if (nxt_slow_path(b == NULL)) { + nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", + recv_msg->stream); + + return NXT_UNIT_ERROR; + } + + nxt_unit_mmap_buf_insert(incoming_tail, b); + incoming_tail = &b->next; + } + + b = recv_msg->incoming_buf; + mmap_msg = recv_msg->start; + pthread_mutex_lock(&process->incoming.mutex); for (; mmap_msg < end; mmap_msg++) { @@ -3043,25 +3488,13 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->size = size; } - b = nxt_unit_mmap_buf_get(ctx); - if (nxt_slow_path(b == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); - - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", - recv_msg->stream); - - nxt_unit_mmap_release(hdr, start, size); - - return NXT_UNIT_ERROR; - } - - nxt_unit_mmap_buf_insert(incoming_tail, b); - incoming_tail = &b->next; - b->buf.start = start; b->buf.free = start; b->buf.end = b->buf.start + size; b->hdr = hdr; + b->process = process; + + b = b->next; nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", recv_msg->stream, @@ -3077,23 +3510,79 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } -static int -nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size) +static void +nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, + nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, + void *start, uint32_t size) { - u_char *p, *end; - nxt_chunk_id_t c; + int freed_chunks; + u_char *p, *end; + nxt_chunk_id_t c; + nxt_unit_impl_t *lib; memset(start, 0xA5, size); p = start; end = p + size; c = nxt_port_mmap_chunk_id(hdr, p); + freed_chunks = 0; while (p < end) { nxt_port_mmap_set_chunk_free(hdr->free_map, c); p += PORT_MMAP_CHUNK_SIZE; c++; + freed_chunks++; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (hdr->src_pid == lib->pid && freed_chunks != 0) { + nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, + -freed_chunks); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + process->pid, + process->outgoing.allocated_chunks); + } + + if (hdr->dst_pid == lib->pid + && freed_chunks != 0 + && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) + { + nxt_unit_send_shm_ack(ctx, hdr->src_pid); + } +} + + +static int +nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_port_id_t port_id; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + nxt_unit_port_id_init(&port_id, pid, 0); + + msg.stream = 0; + msg.pid = lib->pid; + msg.reply_port = 0; + msg.type = _NXT_PORT_MSG_SHM_ACK; + msg.last = 0; + msg.mmap = 0; + msg.nf = 0; + msg.mf = 0; + msg.tracking = 0; + + res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)", + (int) port_id.pid, strerror(errno), errno); + + return NXT_UNIT_ERROR; } return NXT_UNIT_OK; @@ -3255,43 +3744,76 @@ int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; - char buf[4096]; - char oob[256]; - ssize_t rsize; - nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(oob, 0, sizeof(struct cmsghdr)); + pthread_mutex_lock(&ctx_impl->mutex); + + if (ctx_impl->pending_read_head != NULL) { + rbuf = ctx_impl->pending_read_head; + ctx_impl->pending_read_head = rbuf->next; + + if (ctx_impl->pending_read_tail == &rbuf->next) { + ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; + } + + pthread_mutex_unlock(&ctx_impl->mutex); - if (ctx_impl->read_port_fd != -1) { - rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, - buf, sizeof(buf), - oob, sizeof(oob)); } else { - rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, - buf, sizeof(buf), - oob, sizeof(oob)); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_read_buf(ctx, rbuf); } - if (nxt_fast_path(rsize > 0)) { - rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, - oob, sizeof(oob)); + if (nxt_fast_path(rbuf->size > 0)) { + rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, + rbuf->buf, rbuf->size, + rbuf->oob, sizeof(rbuf->oob)); #if (NXT_DEBUG) - memset(buf, 0xAC, rsize); + memset(rbuf->buf, 0xAC, rbuf->size); #endif } else { rc = NXT_UNIT_ERROR; } + nxt_unit_read_buf_release(ctx, rbuf); + return rc; } +static void +nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + if (ctx_impl->read_port_fd != -1) { + rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + + } else { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + } +} + + void nxt_unit_done(nxt_unit_ctx_t *ctx) { @@ -3399,12 +3921,12 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; - nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); - nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); + nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); + nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); while (ctx_impl->free_buf != NULL) { mmap_buf = ctx_impl->free_buf; - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); free(mmap_buf); } diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 3471a758..c8aaa124 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -137,6 +137,9 @@ struct nxt_unit_callbacks_s { /* Gracefully quit the application. Optional. */ void (*quit)(nxt_unit_ctx_t *); + /* Shared memory release acknowledgement. */ + void (*shm_ack_handler)(nxt_unit_ctx_t *); + /* Send data and control to process pid using port id. Optional. */ ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, @@ -155,6 +158,7 @@ struct nxt_unit_init_s { int max_pending_requests; uint32_t request_data_size; + uint32_t shm_limit; nxt_unit_callbacks_t callbacks; @@ -322,6 +326,9 @@ uint32_t nxt_unit_buf_min(void); int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, size_t size); +ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req, + const void *start, size_t size, size_t min_size); + int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_read_info_t *read_info); diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index c068cca4..754e2ea8 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -19,25 +19,6 @@ static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); -nxt_port_handlers_t nxt_app_process_port_handlers = { - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .remove_pid = nxt_port_remove_pid_handler, -}; - - -nxt_port_handlers_t nxt_discovery_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_data_handler, - .remove_pid = nxt_port_remove_pid_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, -}; - const nxt_sig_event_t nxt_worker_process_signals[] = { nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler), diff --git a/src/perl/nxt_perl_psgi.c b/src/perl/nxt_perl_psgi.c index b99d3269..16159b5b 100644 --- a/src/perl/nxt_perl_psgi.c +++ b/src/perl/nxt_perl_psgi.c @@ -1156,6 +1156,7 @@ nxt_perl_psgi_init(nxt_task_t *task, nxt_common_app_conf_t *conf) perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler; perl_init.data = &module; + perl_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&perl_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 45d7a7aa..e4b30319 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -134,6 +134,7 @@ nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_unit_default_init(task, &ruby_unit_init); ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler; + ruby_unit_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&ruby_unit_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/test/nxt_clone_test.c b/src/test/nxt_clone_test.c new file mode 100644 index 00000000..15d36557 --- /dev/null +++ b/src/test/nxt_clone_test.c @@ -0,0 +1,601 @@ +/* + * Copyright (C) NGINX, Inc. + * Copyright (C) Valentin V. Bartenev + */ + +#include <nxt_main.h> +#include <nxt_conf.h> +#include "nxt_tests.h" + + +#define UIDMAP 1 +#define GIDMAP 2 + + +typedef struct { + nxt_int_t map_type; + nxt_str_t map_data; + nxt_int_t setid; + nxt_credential_t creds; + nxt_uid_t unit_euid; + nxt_gid_t unit_egid; + nxt_int_t result; + nxt_str_t errmsg; +} nxt_clone_creds_testcase_t; + +typedef struct { + nxt_clone_creds_testcase_t *tc; +} nxt_clone_creds_ctx_t; + + +nxt_int_t nxt_clone_test_mappings(nxt_task_t *task, nxt_mp_t *mp, + nxt_clone_creds_ctx_t *ctx, nxt_clone_creds_testcase_t *tc); +void nxt_cdecl nxt_clone_test_log_handler(nxt_uint_t level, nxt_log_t *log, + const char *fmt, ...); +nxt_int_t nxt_clone_test_map_assert(nxt_task_t *task, + nxt_clone_creds_testcase_t *tc, nxt_clone_credential_map_t *map); +static nxt_int_t nxt_clone_test_parse_map(nxt_task_t *task, + nxt_str_t *map_str, nxt_clone_credential_map_t *map); + + +nxt_log_t *test_log; + +static nxt_gid_t gids[] = {1000, 10000, 60000}; + +static nxt_clone_creds_testcase_t testcases[] = { + { + /* + * Unprivileged unit + * + * if no uid mapping and app creds and unit creds are the same, + * then we automatically add a map for the creds->uid. + * Then, child process can safely setuid(creds->uid) in + * the new namespace. + */ + UIDMAP, + nxt_string(""), + 0, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string(""), + 0, + {"johndoe", 10000, 10000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1000, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}]"), + 0, + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 65534, \"host\": 1000, \"size\": 1}]"), + 0, + {"nobody", 65534, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1}]"), + 0, + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"uidmap\" field has 2 entries but unprivileged unit has " + "a maximum of 1 map.") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1}]"), + 1, /* privileged */ + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 500, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 1000, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 1500, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 1999, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 2000, 0, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"uidmap\" field has no \"container\" entry for user " + "\"johndoe\" (uid 2000)") + }, + { + /* + * Unprivileged unit + * + * if no gid mapping and app creds and unit creds are the same, + * then we automatically add a map for the creds->base_gid. + * Then, child process can safely setgid(creds->base_gid) in + * the new namespace. + */ + GIDMAP, + nxt_string("[]"), + 0, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Unprivileged unit + * + * Inside the new namespace, we can have any gid but it + * should map to parent gid (in this case 1000) in parent + * namespace. + */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}]"), + 0, + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + GIDMAP, + nxt_string("[{\"container\": 65534, \"host\": 1000, \"size\": 1}]"), + 0, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Unprivileged unit + * + * There's no mapping for "johndoe" (gid 1000) inside the namespace. + */ + GIDMAP, + nxt_string("[{\"container\": 65535, \"host\": 1000, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no \"container\" entry for " + "gid 1000.") + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1000, \"size\": 2}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has an entry with \"size\": 2, but " + "for unprivileged unit it must be 1.") + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1001, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has an entry for host gid 1001 but " + "unprivileged unit can only map itself (gid 1000) " + "into child namespaces.") + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1000, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("unprivileged unit disallow supplementary groups for " + "new namespace (user \"johndoe\" has 3 groups).") + }, + + /* privileged unit */ + + /* not root with capabilities */ + { + GIDMAP, + nxt_string("[]"), + 1, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + GIDMAP, + nxt_string(""), + 1, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* missing gid of {"user": "nobody"} */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}]"), + 1, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no \"container\" entry for " + "gid 65534.") + }, + { + /* solves the previous by mapping 65534 gids */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 65535}]"), + 1, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* solves by adding a separate mapping */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}," + " {\"container\": 65534, \"host\": 1000, \"size\": 1}]"), + 1, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Map a big range + */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 0, \"size\": 200000}]"), + 1, + {"johndoe", 100000, 100000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Validate if supplementary groups are mapped + */ + GIDMAP, + nxt_string("[]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no entries but user \"johndoe\" " + "has 3 suplementary groups."), + }, + { + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 0, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no \"container\" entry for " + "gid 1000."), + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 0, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has missing suplementary gid mappings " + "(found 1 out of 3)."), + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 0, \"size\": 1}," + " {\"container\": 10000, \"host\": 10000, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has missing suplementary gid mappings " + "(found 2 out of 3)."), + }, + { + /* + * Fix all mappings + */ + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 0, \"size\": 1}," + "{\"container\": 10000, \"host\": 10000, \"size\": 1}," + " {\"container\": 60000, \"host\": 60000, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_OK, + nxt_string(""), + }, +}; + + +void nxt_cdecl +nxt_clone_test_log_handler(nxt_uint_t level, nxt_log_t *log, + const char *fmt, ...) +{ + u_char *p, *end; + va_list args; + nxt_clone_creds_ctx_t *ctx; + nxt_clone_creds_testcase_t *tc; + u_char msg[NXT_MAX_ERROR_STR]; + + p = msg; + end = msg + NXT_MAX_ERROR_STR; + + ctx = log->ctx; + tc = ctx->tc; + + va_start(args, fmt); + p = nxt_vsprintf(p, end, fmt, args); + va_end(args); + + *p++ = '\0'; + + if (tc->result == NXT_OK && level == NXT_LOG_DEBUG) { + return; + } + + if (tc->errmsg.length == 0) { + nxt_log_error(NXT_LOG_ERR, &nxt_main_log, "unexpected log: %s", msg); + return; + } + + if (!nxt_str_eq(&tc->errmsg, msg, (nxt_uint_t) (p - msg - 1))) { + nxt_log_error(NXT_LOG_ERR, &nxt_main_log, + "error log mismatch: got [%s] but wants [%V]", + msg, &tc->errmsg); + return; + } +} + + +nxt_int_t +nxt_clone_creds_test(nxt_thread_t *thr) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_uint_t count, i; + nxt_task_t *task; + nxt_runtime_t rt; + nxt_clone_creds_ctx_t ctx; + + nxt_log_t nxt_clone_creds_log = { + NXT_LOG_INFO, + 0, + nxt_clone_test_log_handler, + NULL, + &ctx + }; + + nxt_thread_time_update(thr); + + thr->runtime = &rt; + + task = thr->task; + + mp = nxt_mp_create(1024, 128, 256, 32); + if (mp == NULL) { + return NXT_ERROR; + } + + rt.mem_pool = mp; + + test_log = task->log; + task->log = &nxt_clone_creds_log; + task->thread = thr; + + count = sizeof(testcases)/sizeof(nxt_clone_creds_testcase_t); + + for (i = 0; i < count; i++) { + ret = nxt_clone_test_mappings(task, mp, &ctx, &testcases[i]); + + if (ret != NXT_OK) { + goto fail; + } + } + + ret = NXT_OK; + + nxt_log_error(NXT_LOG_NOTICE, test_log, "clone creds test passed"); + +fail: + task->log = test_log; + nxt_mp_destroy(mp); + + return ret; +} + + +nxt_int_t +nxt_clone_test_mappings(nxt_task_t *task, nxt_mp_t *mp, + nxt_clone_creds_ctx_t *ctx, nxt_clone_creds_testcase_t *tc) +{ + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_clone_credential_map_t map; + + rt = task->thread->runtime; + + map.size = 0; + + if (tc->map_data.length > 0) { + ret = nxt_clone_test_parse_map(task, &tc->map_data, &map); + if (ret != NXT_OK) { + return NXT_ERROR; + } + } + + rt->capabilities.setid = tc->setid; + + nxt_euid = tc->unit_euid; + nxt_egid = tc->unit_egid; + + ctx->tc = tc; + + if (nxt_clone_test_map_assert(task, tc, &map) != NXT_OK) { + return NXT_ERROR; + } + + if (tc->setid && nxt_euid != 0) { + /* + * Running as root should have the same behavior as + * passing Linux capabilities. + */ + + nxt_euid = 0; + nxt_egid = 0; + + if (nxt_clone_test_map_assert(task, tc, &map) != NXT_OK) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +nxt_int_t +nxt_clone_test_map_assert(nxt_task_t *task, nxt_clone_creds_testcase_t *tc, + nxt_clone_credential_map_t *map) +{ + nxt_int_t ret; + + if (tc->map_type == UIDMAP) { + ret = nxt_clone_vldt_credential_uidmap(task, map, &tc->creds); + } else { + ret = nxt_clone_vldt_credential_gidmap(task, map, &tc->creds); + } + + if (ret != tc->result) { + nxt_log_error(NXT_LOG_ERR, &nxt_main_log, + "return %d instead of %d (map: %V)", ret, tc->result, + &tc->map_data); + + return NXT_ERROR; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_clone_test_parse_map(nxt_task_t *task, nxt_str_t *map_str, + nxt_clone_credential_map_t *map) +{ + nxt_uint_t i; + nxt_runtime_t *rt; + nxt_conf_value_t *array, *obj, *value; + + static nxt_str_t host_name = nxt_string("host"); + static nxt_str_t cont_name = nxt_string("container"); + static nxt_str_t size_name = nxt_string("size"); + + rt = task->thread->runtime; + + array = nxt_conf_json_parse_str(rt->mem_pool, map_str); + if (array == NULL) { + return NXT_ERROR; + } + + map->size = nxt_conf_array_elements_count(array); + + if (map->size == 0) { + return NXT_OK; + } + + map->map = nxt_mp_alloc(rt->mem_pool, + map->size * sizeof(nxt_clone_map_entry_t)); + + if (map->map == NULL) { + return NXT_ERROR; + } + + for (i = 0; i < map->size; i++) { + obj = nxt_conf_get_array_element(array, i); + + value = nxt_conf_get_object_member(obj, &host_name, NULL); + map->map[i].host = nxt_conf_get_integer(value); + + value = nxt_conf_get_object_member(obj, &cont_name, NULL); + map->map[i].container = nxt_conf_get_integer(value); + + value = nxt_conf_get_object_member(obj, &size_name, NULL); + map->map[i].size = nxt_conf_get_integer(value); + } + + return NXT_OK; +} diff --git a/src/test/nxt_tests.c b/src/test/nxt_tests.c index 7cba0f69..901d76c3 100644 --- a/src/test/nxt_tests.c +++ b/src/test/nxt_tests.c @@ -162,5 +162,11 @@ main(int argc, char **argv) return 1; } +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_clone_creds_test(thr) != NXT_OK) { + return 1; + } +#endif + return 0; } diff --git a/src/test/nxt_tests.h b/src/test/nxt_tests.h index be4168cf..d531cc7d 100644 --- a/src/test/nxt_tests.h +++ b/src/test/nxt_tests.h @@ -64,6 +64,7 @@ nxt_int_t nxt_malloc_test(nxt_thread_t *thr); nxt_int_t nxt_utf8_test(nxt_thread_t *thr); nxt_int_t nxt_http_parse_test(nxt_thread_t *thr); nxt_int_t nxt_strverscmp_test(nxt_thread_t *thr); +nxt_int_t nxt_clone_creds_test(nxt_thread_t *thr); #endif /* _NXT_TESTS_H_INCLUDED_ */ |